1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36 UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45 WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54 RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61 LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
79use risingwave_pb::health::health_server::HealthServer;
80use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
81use risingwave_pb::user::auth_info::EncryptionType;
82use risingwave_rpc_client::{
83 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
84 MonitorClientPool, MonitorClientPoolRef,
85};
86use risingwave_sqlparser::ast::{ObjectName, Statement};
87use risingwave_sqlparser::parser::Parser;
88use thiserror::Error;
89use thiserror_ext::AsReport;
90use tokio::runtime::Builder;
91use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
92use tokio::sync::oneshot::Sender;
93use tokio::sync::watch;
94use tokio::task::JoinHandle;
95use tracing::{error, info};
96
97use self::cursor_manager::CursorManager;
98use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
99use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
100use crate::catalog::connection_catalog::ConnectionCatalog;
101use crate::catalog::root_catalog::{Catalog, SchemaPath};
102use crate::catalog::secret_catalog::SecretCatalog;
103use crate::catalog::source_catalog::SourceCatalog;
104use crate::catalog::subscription_catalog::SubscriptionCatalog;
105use crate::catalog::{
106 CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
107 check_schema_writable,
108};
109use crate::error::{
110 ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
111};
112use crate::handler::describe::infer_describe;
113use crate::handler::extended_handle::{
114 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
115};
116use crate::handler::privilege::ObjectCheckItem;
117use crate::handler::show::{infer_show_create_object, infer_show_object};
118use crate::handler::util::to_pg_field;
119use crate::handler::variable::infer_show_variable;
120use crate::handler::{RwPgResponse, handle};
121use crate::health_service::HealthServiceImpl;
122use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
123use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
124use crate::observer::FrontendObserverNode;
125use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
126use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
127use crate::scheduler::{
128 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
129 HummockSnapshotManagerRef, QueryManager,
130};
131use crate::telemetry::FrontendTelemetryCreator;
132use crate::user::UserId;
133use crate::user::user_authentication::md5_hash_with_salt;
134use crate::user::user_manager::UserInfoManager;
135use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
136use crate::{FrontendOpts, PgResponseStream, TableCatalog};
137
138pub(crate) mod current;
139pub(crate) mod cursor_manager;
140pub(crate) mod transaction;
141
142#[derive(Clone)]
144pub(crate) struct FrontendEnv {
145 meta_client: Arc<dyn FrontendMetaClient>,
148 catalog_writer: Arc<dyn CatalogWriter>,
149 catalog_reader: CatalogReader,
150 user_info_writer: Arc<dyn UserInfoWriter>,
151 user_info_reader: UserInfoReader,
152 worker_node_manager: WorkerNodeManagerRef,
153 query_manager: QueryManager,
154 hummock_snapshot_manager: HummockSnapshotManagerRef,
155 system_params_manager: LocalSystemParamsManagerRef,
156 session_params: Arc<RwLock<SessionConfig>>,
157
158 server_addr: HostAddr,
159 client_pool: ComputeClientPoolRef,
160 frontend_client_pool: FrontendClientPoolRef,
161 monitor_client_pool: MonitorClientPoolRef,
162
163 sessions_map: SessionMapRef,
167
168 pub frontend_metrics: Arc<FrontendMetrics>,
169
170 pub cursor_metrics: Arc<CursorMetrics>,
171
172 source_metrics: Arc<SourceMetrics>,
173
174 spill_metrics: Arc<BatchSpillMetrics>,
176
177 batch_config: BatchConfig,
178 frontend_config: FrontendConfig,
179 #[expect(dead_code)]
180 meta_config: MetaConfig,
181 streaming_config: StreamingConfig,
182 udf_config: UdfConfig,
183
184 creating_streaming_job_tracker: StreamingJobTrackerRef,
187
188 compute_runtime: Arc<BackgroundShutdownRuntime>,
191
192 mem_context: MemoryContext,
194
195 #[cfg(feature = "datafusion")]
196 df_spillable_budget_ctx: MemoryContext,
201
202 serverless_backfill_controller_addr: String,
204
205 prometheus_client: Option<PrometheusClient>,
207
208 prometheus_selector: String,
210}
211
212pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
214
215const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
217
218impl FrontendEnv {
219 pub fn mock() -> Self {
220 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
221
222 let catalog = Arc::new(RwLock::new(Catalog::default()));
223 let meta_client = Arc::new(MockFrontendMetaClient {});
224 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
225 let catalog_writer = Arc::new(MockCatalogWriter::new(
226 catalog.clone(),
227 hummock_snapshot_manager.clone(),
228 ));
229 let catalog_reader = CatalogReader::new(catalog);
230 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
231 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
232 let user_info_reader = UserInfoReader::new(user_info_manager);
233 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
234 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
235 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
236 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
237 let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
238 let query_manager = QueryManager::new(
239 worker_node_manager.clone(),
240 compute_client_pool,
241 catalog_reader.clone(),
242 Arc::new(DistributedQueryMetrics::for_test()),
243 None,
244 None,
245 );
246 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
247 let client_pool = Arc::new(ComputeClientPool::for_test());
248 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
249 let runtime = {
250 let mut builder = Builder::new_multi_thread();
251 if let Some(frontend_compute_runtime_worker_threads) =
252 load_config("", FrontendOpts::default())
253 .batch
254 .frontend_compute_runtime_worker_threads
255 {
256 builder.worker_threads(frontend_compute_runtime_worker_threads);
257 }
258 builder
259 .thread_name("rw-batch-local")
260 .enable_all()
261 .build()
262 .unwrap()
263 };
264 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
265 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
266 Self {
267 meta_client,
268 catalog_writer,
269 catalog_reader,
270 user_info_writer,
271 user_info_reader,
272 worker_node_manager,
273 query_manager,
274 hummock_snapshot_manager,
275 system_params_manager,
276 session_params: Default::default(),
277 server_addr,
278 client_pool,
279 frontend_client_pool,
280 monitor_client_pool,
281 sessions_map,
282 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
283 cursor_metrics: Arc::new(CursorMetrics::for_test()),
284 batch_config: BatchConfig::default(),
285 frontend_config: FrontendConfig::default(),
286 meta_config: MetaConfig::default(),
287 streaming_config: StreamingConfig::default(),
288 udf_config: UdfConfig::default(),
289 source_metrics: Arc::new(SourceMetrics::default()),
290 spill_metrics: BatchSpillMetrics::for_test(),
291 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
292 compute_runtime,
293 mem_context: MemoryContext::none(),
294 #[cfg(feature = "datafusion")]
295 df_spillable_budget_ctx: MemoryContext::none(),
296 serverless_backfill_controller_addr: Default::default(),
297 prometheus_client: None,
298 prometheus_selector: String::new(),
299 }
300 }
301
302 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
303 let config = load_config(&opts.config_path, &opts);
304 info!("Starting frontend node");
305 info!("> config: {:?}", config);
306 info!(
307 "> debug assertions: {}",
308 if cfg!(debug_assertions) { "on" } else { "off" }
309 );
310 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
311
312 let frontend_address: HostAddr = opts
313 .advertise_addr
314 .as_ref()
315 .unwrap_or_else(|| {
316 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
317 &opts.listen_addr
318 })
319 .parse()
320 .unwrap();
321 info!("advertise addr is {}", frontend_address);
322
323 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
324 let internal_rpc_host_addr = HostAddr {
325 host: frontend_address.host.clone(),
327 port: rpc_addr.port,
328 };
329 let (meta_client, system_params_reader) = MetaClient::register_new(
331 opts.meta_addr,
332 WorkerType::Frontend,
333 &frontend_address,
334 AddWorkerNodeProperty {
335 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
336 ..Default::default()
337 },
338 Arc::new(config.meta.clone()),
339 )
340 .await;
341
342 let worker_id = meta_client.worker_id();
343 info!("Assigned worker node id {}", worker_id);
344
345 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
346 meta_client.clone(),
347 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
348 );
349 let mut join_handles = vec![heartbeat_join_handle];
350 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
351
352 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
353 let hummock_snapshot_manager =
354 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
355
356 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
357 let catalog = Arc::new(RwLock::new(Catalog::default()));
358 let catalog_writer = Arc::new(CatalogWriterImpl::new(
359 meta_client.clone(),
360 catalog_updated_rx.clone(),
361 hummock_snapshot_manager.clone(),
362 ));
363 let catalog_reader = CatalogReader::new(catalog.clone());
364
365 let worker_node_manager = Arc::new(WorkerNodeManager::new());
366
367 let compute_client_pool = Arc::new(ComputeClientPool::new(
368 config.batch_exchange_connection_pool_size(),
369 config.batch.developer.compute_client_config.clone(),
370 ));
371 let frontend_client_pool = Arc::new(FrontendClientPool::new(
372 1,
373 config.batch.developer.frontend_client_config.clone(),
374 ));
375 let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
376 let query_manager = QueryManager::new(
377 worker_node_manager.clone(),
378 compute_client_pool.clone(),
379 catalog_reader.clone(),
380 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
381 config.batch.distributed_query_limit,
382 config.batch.max_batch_queries_per_frontend_node,
383 );
384
385 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
386 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
387 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
388 meta_client.clone(),
389 catalog_updated_rx,
390 ));
391
392 let system_params_manager =
393 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
394
395 LocalSecretManager::init(
396 opts.temp_secret_file_dir,
397 meta_client.cluster_id().to_owned(),
398 worker_id,
399 );
400
401 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
403 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
404 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
405
406 let frontend_observer_node = FrontendObserverNode::new(
407 worker_node_manager.clone(),
408 catalog,
409 catalog_updated_tx,
410 user_info_manager,
411 hummock_snapshot_manager.clone(),
412 system_params_manager.clone(),
413 session_params.clone(),
414 compute_client_pool.clone(),
415 );
416 let observer_manager =
417 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
418 .await;
419 let observer_join_handle = observer_manager.start().await;
420 join_handles.push(observer_join_handle);
421
422 meta_client.activate(&frontend_address).await?;
423
424 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
425 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
426 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
427
428 if config.server.metrics_level > MetricLevel::Disabled {
429 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
430 }
431
432 let health_srv = HealthServiceImpl::new();
433 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
434 let monitor_srv = MonitorServiceImpl::new(config.server.clone());
435 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
436
437 let telemetry_manager = TelemetryManager::new(
438 Arc::new(meta_client.clone()),
439 Arc::new(FrontendTelemetryCreator::new()),
440 );
441
442 if config.server.telemetry_enabled && telemetry_env_enabled() {
445 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
446 join_handles.push(join_handle);
447 shutdown_senders.push(shutdown_sender);
448 } else {
449 tracing::info!("Telemetry didn't start due to config");
450 }
451
452 tokio::spawn(async move {
453 tonic::transport::Server::builder()
454 .add_service(HealthServer::new(health_srv))
455 .add_service(FrontendServiceServer::new(frontend_srv))
456 .add_service(MonitorServiceServer::new(monitor_srv))
457 .serve(frontend_rpc_addr)
458 .await
459 .unwrap();
460 });
461 info!(
462 "Health Check RPC Listener is set up on {}",
463 opts.frontend_rpc_listener_addr.clone()
464 );
465
466 let creating_streaming_job_tracker =
467 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
468
469 let runtime = {
470 let mut builder = Builder::new_multi_thread();
471 if let Some(frontend_compute_runtime_worker_threads) =
472 config.batch.frontend_compute_runtime_worker_threads
473 {
474 builder.worker_threads(frontend_compute_runtime_worker_threads);
475 }
476 builder
477 .thread_name("rw-batch-local")
478 .enable_all()
479 .build()
480 .unwrap()
481 };
482 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
483
484 let sessions = sessions_map.clone();
485 let join_handle = tokio::spawn(async move {
487 let mut check_idle_txn_interval =
488 tokio::time::interval(core::time::Duration::from_secs(10));
489 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
490 check_idle_txn_interval.reset();
491 loop {
492 check_idle_txn_interval.tick().await;
493 sessions.read().values().for_each(|session| {
494 let _ = session.check_idle_in_transaction_timeout();
495 })
496 }
497 });
498 join_handles.push(join_handle);
499
500 #[cfg(not(madsim))]
502 if config.batch.enable_spill {
503 SpillOp::clean_spill_directory()
504 .await
505 .map_err(|err| anyhow!(err))?;
506 }
507
508 let total_memory_bytes = opts.frontend_total_memory_bytes;
509 let heap_profiler =
510 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
511 heap_profiler.start();
513
514 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
515 let mem_context = MemoryContext::root(
516 frontend_metrics.batch_total_mem.clone(),
517 batch_memory_limit as u64,
518 );
519 #[cfg(feature = "datafusion")]
520 let df_spillable_budget_ctx =
521 crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
522
523 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
525 match PrometheusClient::try_from(endpoint.as_str()) {
526 Ok(client) => {
527 info!("Prometheus client initialized with endpoint: {}", endpoint);
528 Some(client)
529 }
530 Err(e) => {
531 error!(
532 error = %e.as_report(),
533 "failed to initialize Prometheus client",
534 );
535 None
536 }
537 }
538 } else {
539 None
540 };
541
542 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
544
545 info!(
546 "Frontend total_memory: {} batch_memory: {}",
547 convert(total_memory_bytes as _),
548 convert(batch_memory_limit as _),
549 );
550
551 Ok((
552 Self {
553 catalog_reader,
554 catalog_writer,
555 user_info_reader,
556 user_info_writer,
557 worker_node_manager,
558 meta_client: frontend_meta_client,
559 query_manager,
560 hummock_snapshot_manager,
561 system_params_manager,
562 session_params,
563 server_addr: frontend_address,
564 client_pool: compute_client_pool,
565 frontend_client_pool,
566 monitor_client_pool,
567 frontend_metrics,
568 cursor_metrics,
569 spill_metrics,
570 sessions_map,
571 batch_config: config.batch,
572 frontend_config: config.frontend,
573 meta_config: config.meta,
574 streaming_config: config.streaming,
575 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
576 udf_config: config.udf,
577 source_metrics,
578 creating_streaming_job_tracker,
579 compute_runtime,
580 mem_context,
581 #[cfg(feature = "datafusion")]
582 df_spillable_budget_ctx,
583 prometheus_client,
584 prometheus_selector,
585 },
586 join_handles,
587 shutdown_senders,
588 ))
589 }
590
591 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
596 &*self.catalog_writer
597 }
598
599 pub fn catalog_reader(&self) -> &CatalogReader {
601 &self.catalog_reader
602 }
603
604 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
609 &*self.user_info_writer
610 }
611
612 pub fn user_info_reader(&self) -> &UserInfoReader {
614 &self.user_info_reader
615 }
616
617 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
618 self.worker_node_manager.clone()
619 }
620
621 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
622 &*self.meta_client
623 }
624
625 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
626 self.meta_client.clone()
627 }
628
629 pub fn query_manager(&self) -> &QueryManager {
630 &self.query_manager
631 }
632
633 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
634 &self.hummock_snapshot_manager
635 }
636
637 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
638 &self.system_params_manager
639 }
640
641 pub fn session_params_snapshot(&self) -> SessionConfig {
642 self.session_params.read_recursive().clone()
643 }
644
645 pub fn sbc_address(&self) -> &String {
646 &self.serverless_backfill_controller_addr
647 }
648
649 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
651 self.prometheus_client.as_ref()
652 }
653
654 pub fn prometheus_selector(&self) -> &str {
656 &self.prometheus_selector
657 }
658
659 pub fn server_address(&self) -> &HostAddr {
660 &self.server_addr
661 }
662
663 pub fn client_pool(&self) -> ComputeClientPoolRef {
664 self.client_pool.clone()
665 }
666
667 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
668 self.frontend_client_pool.clone()
669 }
670
671 pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
672 self.monitor_client_pool.clone()
673 }
674
675 pub fn batch_config(&self) -> &BatchConfig {
676 &self.batch_config
677 }
678
679 pub fn frontend_config(&self) -> &FrontendConfig {
680 &self.frontend_config
681 }
682
683 pub fn streaming_config(&self) -> &StreamingConfig {
684 &self.streaming_config
685 }
686
687 pub fn udf_config(&self) -> &UdfConfig {
688 &self.udf_config
689 }
690
691 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
692 self.source_metrics.clone()
693 }
694
695 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
696 self.spill_metrics.clone()
697 }
698
699 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
700 &self.creating_streaming_job_tracker
701 }
702
703 pub fn sessions_map(&self) -> &SessionMapRef {
704 &self.sessions_map
705 }
706
707 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
708 self.compute_runtime.clone()
709 }
710
711 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
714 cancel_queries_in_session(session_id, self.sessions_map.clone())
715 }
716
717 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
720 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
721 }
722
723 pub fn mem_context(&self) -> MemoryContext {
724 self.mem_context.clone()
725 }
726
727 #[cfg(feature = "datafusion")]
728 pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
729 self.df_spillable_budget_ctx.clone()
730 }
731}
732
733#[derive(Clone)]
734pub struct AuthContext {
735 pub database: String,
736 pub user_name: String,
737 pub user_id: UserId,
738}
739
740impl AuthContext {
741 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
742 Self {
743 database,
744 user_name,
745 user_id,
746 }
747 }
748}
749pub struct SessionImpl {
750 env: FrontendEnv,
751 auth_context: Arc<RwLock<AuthContext>>,
752 user_authenticator: UserAuthenticator,
754 config_map: Arc<RwLock<SessionConfig>>,
756
757 notice_tx: UnboundedSender<String>,
759 notice_rx: Mutex<UnboundedReceiver<String>>,
761
762 id: (i32, i32),
764
765 peer_addr: AddressRef,
767
768 txn: Arc<Mutex<transaction::State>>,
772
773 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
777
778 exec_context: Mutex<Option<Weak<ExecContext>>>,
780
781 last_idle_instant: Arc<Mutex<Option<Instant>>>,
783
784 cursor_manager: Arc<CursorManager>,
785
786 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
788
789 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
791}
792
793#[derive(Default, Clone)]
802pub struct TemporarySourceManager {
803 sources: HashMap<String, SourceCatalog>,
804}
805
806impl TemporarySourceManager {
807 pub fn new() -> Self {
808 Self {
809 sources: HashMap::new(),
810 }
811 }
812
813 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
814 self.sources.insert(name, source);
815 }
816
817 pub fn drop_source(&mut self, name: &str) {
818 self.sources.remove(name);
819 }
820
821 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
822 self.sources.get(name)
823 }
824
825 pub fn keys(&self) -> Vec<String> {
826 self.sources.keys().cloned().collect()
827 }
828}
829
830#[derive(Default, Clone)]
832pub struct StagingCatalogManager {
833 tables: HashMap<String, TableCatalog>,
835}
836
837impl StagingCatalogManager {
838 pub fn new() -> Self {
839 Self {
840 tables: HashMap::new(),
841 }
842 }
843
844 pub fn create_table(&mut self, name: String, table: TableCatalog) {
845 self.tables.insert(name, table);
846 }
847
848 pub fn drop_table(&mut self, name: &str) {
849 self.tables.remove(name);
850 }
851
852 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
853 self.tables.get(name)
854 }
855}
856
857#[derive(Error, Debug)]
858pub enum CheckRelationError {
859 #[error("{0}")]
860 Resolve(#[from] ResolveQualifiedNameError),
861 #[error("{0}")]
862 Catalog(#[from] CatalogError),
863}
864
865impl From<CheckRelationError> for RwError {
866 fn from(e: CheckRelationError) -> Self {
867 match e {
868 CheckRelationError::Resolve(e) => e.into(),
869 CheckRelationError::Catalog(e) => e.into(),
870 }
871 }
872}
873
874impl SessionImpl {
875 pub(crate) fn new(
876 env: FrontendEnv,
877 auth_context: AuthContext,
878 user_authenticator: UserAuthenticator,
879 id: SessionId,
880 peer_addr: AddressRef,
881 session_config: SessionConfig,
882 ) -> Self {
883 let cursor_metrics = env.cursor_metrics.clone();
884 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
885
886 Self {
887 env,
888 auth_context: Arc::new(RwLock::new(auth_context)),
889 user_authenticator,
890 config_map: Arc::new(RwLock::new(session_config)),
891 id,
892 peer_addr,
893 txn: Default::default(),
894 current_query_cancel_flag: Mutex::new(None),
895 notice_tx,
896 notice_rx: Mutex::new(notice_rx),
897 exec_context: Mutex::new(None),
898 last_idle_instant: Default::default(),
899 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
900 temporary_source_manager: Default::default(),
901 staging_catalog_manager: Default::default(),
902 }
903 }
904
905 #[cfg(test)]
906 pub fn mock() -> Self {
907 let env = FrontendEnv::mock();
908 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
909
910 Self {
911 env: FrontendEnv::mock(),
912 auth_context: Arc::new(RwLock::new(AuthContext::new(
913 DEFAULT_DATABASE_NAME.to_owned(),
914 DEFAULT_SUPER_USER.to_owned(),
915 DEFAULT_SUPER_USER_ID,
916 ))),
917 user_authenticator: UserAuthenticator::None,
918 config_map: Default::default(),
919 id: (0, 0),
921 txn: Default::default(),
922 current_query_cancel_flag: Mutex::new(None),
923 notice_tx,
924 notice_rx: Mutex::new(notice_rx),
925 exec_context: Mutex::new(None),
926 peer_addr: Address::Tcp(SocketAddr::new(
927 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
928 8080,
929 ))
930 .into(),
931 last_idle_instant: Default::default(),
932 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
933 temporary_source_manager: Default::default(),
934 staging_catalog_manager: Default::default(),
935 }
936 }
937
938 pub(crate) fn env(&self) -> &FrontendEnv {
939 &self.env
940 }
941
942 pub fn auth_context(&self) -> Arc<AuthContext> {
943 let ctx = self.auth_context.read();
944 Arc::new(ctx.clone())
945 }
946
947 pub fn database(&self) -> String {
948 self.auth_context.read().database.clone()
949 }
950
951 pub fn database_id(&self) -> DatabaseId {
952 let db_name = self.database();
953 self.env
954 .catalog_reader()
955 .read_guard()
956 .get_database_by_name(&db_name)
957 .map(|db| db.id())
958 .expect("session database not found")
959 }
960
961 pub fn user_name(&self) -> String {
962 self.auth_context.read().user_name.clone()
963 }
964
965 pub fn user_id(&self) -> UserId {
966 self.auth_context.read().user_id
967 }
968
969 pub fn update_database(&self, database: String) {
970 self.auth_context.write().database = database;
971 }
972
973 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
974 Arc::clone(&self.config_map)
975 }
976
977 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
978 self.config_map.read()
979 }
980
981 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
982 self.config_map
983 .write()
984 .set(key, value, &mut ())
985 .map_err(Into::into)
986 }
987
988 pub fn reset_config(&self, key: &str) -> Result<String> {
989 self.config_map
990 .write()
991 .reset(key, &mut ())
992 .map_err(Into::into)
993 }
994
995 pub fn set_config_report(
996 &self,
997 key: &str,
998 value: Option<String>,
999 mut reporter: impl ConfigReporter,
1000 ) -> Result<String> {
1001 if let Some(value) = value {
1002 self.config_map
1003 .write()
1004 .set(key, value, &mut reporter)
1005 .map_err(Into::into)
1006 } else {
1007 self.config_map
1008 .write()
1009 .reset(key, &mut reporter)
1010 .map_err(Into::into)
1011 }
1012 }
1013
1014 pub fn session_id(&self) -> SessionId {
1015 self.id
1016 }
1017
1018 pub fn running_sql(&self) -> Option<Arc<str>> {
1019 self.exec_context
1020 .lock()
1021 .as_ref()
1022 .and_then(|weak| weak.upgrade())
1023 .map(|context| context.running_sql.clone())
1024 }
1025
1026 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1027 self.cursor_manager.clone()
1028 }
1029
1030 pub fn peer_addr(&self) -> &Address {
1031 &self.peer_addr
1032 }
1033
1034 pub fn elapse_since_running_sql(&self) -> Option<u128> {
1035 self.exec_context
1036 .lock()
1037 .as_ref()
1038 .and_then(|weak| weak.upgrade())
1039 .map(|context| context.last_instant.elapsed().as_millis())
1040 }
1041
1042 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1043 self.last_idle_instant
1044 .lock()
1045 .as_ref()
1046 .map(|x| x.elapsed().as_millis())
1047 }
1048
1049 pub fn check_relation_name_duplicated(
1050 &self,
1051 name: ObjectName,
1052 stmt_type: StatementType,
1053 if_not_exists: bool,
1054 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1055 let db_name = &self.database();
1056 let catalog_reader = self.env().catalog_reader().read_guard();
1057 let (schema_name, relation_name) = {
1058 let (schema_name, relation_name) =
1059 Binder::resolve_schema_qualified_name(db_name, &name)?;
1060 let search_path = self.config().search_path();
1061 let user_name = &self.user_name();
1062 let schema_name = match schema_name {
1063 Some(schema_name) => schema_name,
1064 None => catalog_reader
1065 .first_valid_schema(db_name, &search_path, user_name)?
1066 .name(),
1067 };
1068 (schema_name, relation_name)
1069 };
1070 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1071 Err(e) if if_not_exists => {
1072 if let CatalogErrorInner::Duplicated {
1073 name,
1074 under_creation,
1075 ..
1076 } = e.inner()
1077 {
1078 if !*under_creation {
1083 Ok(Either::Right(
1084 PgResponse::builder(stmt_type)
1085 .notice(format!("relation \"{}\" already exists, skipping", name))
1086 .into(),
1087 ))
1088 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1089 Ok(Either::Right(
1092 PgResponse::builder(stmt_type)
1093 .notice(format!(
1094 "relation \"{}\" already exists but still creating, skipping",
1095 name
1096 ))
1097 .into(),
1098 ))
1099 } else {
1100 Ok(Either::Left(()))
1101 }
1102 } else {
1103 Err(e.into())
1104 }
1105 }
1106 Err(e) => Err(e.into()),
1107 Ok(_) => Ok(Either::Left(())),
1108 }
1109 }
1110
1111 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1112 let db_name = &self.database();
1113 let catalog_reader = self.env().catalog_reader().read_guard();
1114 let (schema_name, secret_name) = {
1115 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1116 let search_path = self.config().search_path();
1117 let user_name = &self.user_name();
1118 let schema_name = match schema_name {
1119 Some(schema_name) => schema_name,
1120 None => catalog_reader
1121 .first_valid_schema(db_name, &search_path, user_name)?
1122 .name(),
1123 };
1124 (schema_name, secret_name)
1125 };
1126 catalog_reader
1127 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1128 .map_err(RwError::from)
1129 }
1130
1131 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1132 let db_name = &self.database();
1133 let catalog_reader = self.env().catalog_reader().read_guard();
1134 let (schema_name, connection_name) = {
1135 let (schema_name, connection_name) =
1136 Binder::resolve_schema_qualified_name(db_name, &name)?;
1137 let search_path = self.config().search_path();
1138 let user_name = &self.user_name();
1139 let schema_name = match schema_name {
1140 Some(schema_name) => schema_name,
1141 None => catalog_reader
1142 .first_valid_schema(db_name, &search_path, user_name)?
1143 .name(),
1144 };
1145 (schema_name, connection_name)
1146 };
1147 catalog_reader
1148 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1149 .map_err(RwError::from)
1150 }
1151
1152 pub fn check_function_name_duplicated(
1153 &self,
1154 stmt_type: StatementType,
1155 name: ObjectName,
1156 arg_types: &[DataType],
1157 if_not_exists: bool,
1158 ) -> Result<Either<(), RwPgResponse>> {
1159 let db_name = &self.database();
1160 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1161 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1162
1163 let catalog_reader = self.env().catalog_reader().read_guard();
1164 if catalog_reader
1165 .get_schema_by_id(database_id, schema_id)?
1166 .get_function_by_name_args(&function_name, arg_types)
1167 .is_some()
1168 {
1169 let full_name = format!(
1170 "{function_name}({})",
1171 arg_types.iter().map(|t| t.to_string()).join(",")
1172 );
1173 if if_not_exists {
1174 Ok(Either::Right(
1175 PgResponse::builder(stmt_type)
1176 .notice(format!(
1177 "function \"{}\" already exists, skipping",
1178 full_name
1179 ))
1180 .into(),
1181 ))
1182 } else {
1183 Err(CatalogError::duplicated("function", full_name).into())
1184 }
1185 } else {
1186 Ok(Either::Left(()))
1187 }
1188 }
1189
1190 pub fn get_database_and_schema_id_for_create(
1192 &self,
1193 schema_name: Option<String>,
1194 ) -> Result<(DatabaseId, SchemaId)> {
1195 let db_name = &self.database();
1196
1197 let search_path = self.config().search_path();
1198 let user_name = &self.user_name();
1199
1200 let catalog_reader = self.env().catalog_reader().read_guard();
1201 let schema = match schema_name {
1202 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1203 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1204 };
1205 let schema_name = schema.name();
1206
1207 check_schema_writable(&schema_name)?;
1208 self.check_privileges(&[ObjectCheckItem::new(
1209 schema.owner(),
1210 AclMode::Create,
1211 schema_name,
1212 schema.id(),
1213 )])?;
1214
1215 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1216 Ok((db_id, schema.id()))
1217 }
1218
1219 pub fn get_connection_by_name(
1220 &self,
1221 schema_name: Option<String>,
1222 connection_name: &str,
1223 ) -> Result<Arc<ConnectionCatalog>> {
1224 let db_name = &self.database();
1225 let search_path = self.config().search_path();
1226 let user_name = &self.user_name();
1227
1228 let catalog_reader = self.env().catalog_reader().read_guard();
1229 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1230 let (connection, _) =
1231 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1232
1233 self.check_privileges(&[ObjectCheckItem::new(
1234 connection.owner(),
1235 AclMode::Usage,
1236 connection.name.clone(),
1237 connection.id,
1238 )])?;
1239
1240 Ok(connection.clone())
1241 }
1242
1243 pub fn get_subscription_by_schema_id_name(
1244 &self,
1245 schema_id: SchemaId,
1246 subscription_name: &str,
1247 ) -> Result<Arc<SubscriptionCatalog>> {
1248 let db_name = &self.database();
1249
1250 let catalog_reader = self.env().catalog_reader().read_guard();
1251 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1252 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1253 let subscription = schema
1254 .get_subscription_by_name(subscription_name)
1255 .ok_or_else(|| {
1256 RwError::from(ErrorCode::ItemNotFound(format!(
1257 "subscription {} not found",
1258 subscription_name
1259 )))
1260 })?;
1261 Ok(subscription.clone())
1262 }
1263
1264 pub fn get_subscription_by_name(
1265 &self,
1266 schema_name: Option<String>,
1267 subscription_name: &str,
1268 ) -> Result<Arc<SubscriptionCatalog>> {
1269 let db_name = &self.database();
1270 let search_path = self.config().search_path();
1271 let user_name = &self.user_name();
1272
1273 let catalog_reader = self.env().catalog_reader().read_guard();
1274 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1275 let (subscription, _) =
1276 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1277 Ok(subscription.clone())
1278 }
1279
1280 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1281 let catalog_reader = self.env().catalog_reader().read_guard();
1282 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1283 }
1284
1285 pub fn get_table_by_name(
1286 &self,
1287 table_name: &str,
1288 db_id: DatabaseId,
1289 schema_id: SchemaId,
1290 ) -> Result<Arc<TableCatalog>> {
1291 let catalog_reader = self.env().catalog_reader().read_guard();
1292 let table = catalog_reader
1293 .get_schema_by_id(db_id, schema_id)?
1294 .get_created_table_by_name(table_name)
1295 .ok_or_else(|| {
1296 Error::new(
1297 ErrorKind::InvalidInput,
1298 format!("table \"{}\" does not exist", table_name),
1299 )
1300 })?;
1301
1302 self.check_privileges(&[ObjectCheckItem::new(
1303 table.owner(),
1304 AclMode::Select,
1305 table_name.to_owned(),
1306 table.id,
1307 )])?;
1308
1309 Ok(table.clone())
1310 }
1311
1312 pub fn get_secret_by_name(
1313 &self,
1314 schema_name: Option<String>,
1315 secret_name: &str,
1316 ) -> Result<Arc<SecretCatalog>> {
1317 let db_name = &self.database();
1318 let search_path = self.config().search_path();
1319 let user_name = &self.user_name();
1320
1321 let catalog_reader = self.env().catalog_reader().read_guard();
1322 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1323 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1324
1325 self.check_privileges(&[ObjectCheckItem::new(
1326 secret.owner(),
1327 AclMode::Usage,
1328 secret.name.clone(),
1329 secret.id,
1330 )])?;
1331
1332 Ok(secret.clone())
1333 }
1334
1335 pub async fn list_change_log_epochs(
1336 &self,
1337 table_id: TableId,
1338 min_epoch: u64,
1339 max_count: u32,
1340 ) -> Result<Vec<u64>> {
1341 let Some(max_epoch) = self
1342 .env
1343 .hummock_snapshot_manager()
1344 .acquire()
1345 .version()
1346 .state_table_info
1347 .info()
1348 .get(&table_id)
1349 .map(|s| s.committed_epoch)
1350 else {
1351 return Ok(vec![]);
1352 };
1353 let ret = self
1354 .env
1355 .meta_client_ref()
1356 .get_hummock_table_change_log(
1357 Some(min_epoch),
1358 Some(max_epoch),
1359 Some(iter::once(table_id).collect()),
1360 true,
1361 Some(max_count),
1362 )
1363 .await?;
1364 let Some(e) = ret.get(&table_id) else {
1365 return Ok(vec![]);
1366 };
1367 Ok(e.iter()
1368 .flat_map(|l| l.epochs())
1369 .filter(|e| *e >= min_epoch && *e <= max_epoch)
1370 .take(max_count as usize)
1371 .collect())
1372 }
1373
1374 pub fn clear_cancel_query_flag(&self) {
1375 let mut flag = self.current_query_cancel_flag.lock();
1376 *flag = None;
1377 }
1378
1379 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1380 let mut flag = self.current_query_cancel_flag.lock();
1381 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1382 *flag = Some(shutdown_tx);
1383 shutdown_rx
1384 }
1385
1386 pub fn cancel_current_query(&self) {
1387 let mut flag_guard = self.current_query_cancel_flag.lock();
1388 if let Some(sender) = flag_guard.take() {
1389 info!("Trying to cancel query in local mode.");
1390 sender.cancel();
1392 info!("Cancel query request sent.");
1393 } else {
1394 info!("Trying to cancel query in distributed mode.");
1395 self.env.query_manager().cancel_queries_in_session(self.id)
1396 }
1397 }
1398
1399 pub fn cancel_current_creating_job(&self) {
1400 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1401 }
1402
1403 pub async fn run_statement(
1406 self: Arc<Self>,
1407 sql: Arc<str>,
1408 formats: Vec<Format>,
1409 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1410 let mut stmts = Parser::parse_sql(&sql)?;
1412 if stmts.is_empty() {
1413 return Ok(PgResponse::empty_result(
1414 pgwire::pg_response::StatementType::EMPTY,
1415 ));
1416 }
1417 if stmts.len() > 1 {
1418 return Ok(
1419 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1420 .notice("cannot insert multiple commands into statement")
1421 .into(),
1422 );
1423 }
1424 let stmt = stmts.swap_remove(0);
1425 let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1426 Ok(rsp)
1427 }
1428
1429 pub fn notice_to_user(&self, str: impl Into<String>) {
1430 let notice = str.into();
1431 tracing::trace!(notice, "notice to user");
1432 self.notice_tx
1433 .send(notice)
1434 .expect("notice channel should not be closed");
1435 }
1436
1437 pub fn is_barrier_read(&self) -> bool {
1438 match self.config().visibility_mode() {
1439 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1440 VisibilityMode::All => true,
1441 VisibilityMode::Checkpoint => false,
1442 }
1443 }
1444
1445 pub fn statement_timeout(&self) -> Duration {
1446 if self.config().statement_timeout().millis() == 0 {
1447 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1448 } else {
1449 Duration::from_millis(self.config().statement_timeout().millis() as u64)
1450 }
1451 }
1452
1453 pub fn create_temporary_source(&self, source: SourceCatalog) {
1454 self.temporary_source_manager
1455 .lock()
1456 .create_source(source.name.clone(), source);
1457 }
1458
1459 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1460 self.temporary_source_manager
1461 .lock()
1462 .get_source(name)
1463 .cloned()
1464 }
1465
1466 pub fn drop_temporary_source(&self, name: &str) {
1467 self.temporary_source_manager.lock().drop_source(name);
1468 }
1469
1470 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1471 self.temporary_source_manager.lock().clone()
1472 }
1473
1474 pub fn create_staging_table(&self, table: TableCatalog) {
1475 self.staging_catalog_manager
1476 .lock()
1477 .create_table(table.name.clone(), table);
1478 }
1479
1480 pub fn drop_staging_table(&self, name: &str) {
1481 self.staging_catalog_manager.lock().drop_table(name);
1482 }
1483
1484 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1485 self.staging_catalog_manager.lock().clone()
1486 }
1487
1488 pub async fn check_cluster_limits(&self) -> Result<()> {
1489 if self.config().bypass_cluster_limits() {
1490 return Ok(());
1491 }
1492
1493 let gen_message = |ActorCountPerParallelism {
1494 worker_id_to_actor_count,
1495 hard_limit,
1496 soft_limit,
1497 }: ActorCountPerParallelism,
1498 exceed_hard_limit: bool|
1499 -> String {
1500 let (limit_type, action) = if exceed_hard_limit {
1501 ("critical", "Scale the cluster immediately to proceed.")
1502 } else {
1503 (
1504 "recommended",
1505 "Consider scaling the cluster for optimal performance.",
1506 )
1507 };
1508 format!(
1509 r#"Actor count per parallelism exceeds the {limit_type} limit.
1510
1511Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1512
1513HINT:
1514- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1515- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1516 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1517- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1518
1519DETAILS:
1520- hard limit: {hard_limit}
1521- soft limit: {soft_limit}
1522- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1523 )
1524 };
1525
1526 let limits = self.env().meta_client().get_cluster_limits().await?;
1527 for limit in limits {
1528 match limit {
1529 cluster_limit::ClusterLimit::ActorCount(l) => {
1530 if l.exceed_hard_limit() {
1531 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1532 l, true,
1533 ))));
1534 } else if l.exceed_soft_limit() {
1535 self.notice_to_user(gen_message(l, false));
1536 }
1537 }
1538 }
1539 }
1540 Ok(())
1541 }
1542}
1543
1544pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1545 std::sync::OnceLock::new();
1546
1547pub struct SessionManagerImpl {
1548 env: FrontendEnv,
1549 _join_handles: Vec<JoinHandle<()>>,
1550 _shutdown_senders: Vec<Sender<()>>,
1551 number: AtomicI32,
1552}
1553
1554impl SessionManager for SessionManagerImpl {
1555 type Error = RwError;
1556 type Session = SessionImpl;
1557
1558 fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1559 let dummy_addr = Address::Tcp(SocketAddr::new(
1560 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1561 5691, ));
1563
1564 self.connect_inner(
1567 database_id,
1568 risingwave_common::catalog::DEFAULT_SUPER_USER,
1569 Arc::new(dummy_addr),
1570 )
1571 }
1572
1573 fn connect(
1574 &self,
1575 database: &str,
1576 user_name: &str,
1577 peer_addr: AddressRef,
1578 ) -> Result<Arc<Self::Session>> {
1579 let catalog_reader = self.env.catalog_reader();
1580 let reader = catalog_reader.read_guard();
1581 let database_id = reader.get_database_by_name(database)?.id();
1582
1583 self.connect_inner(database_id, user_name, peer_addr)
1584 }
1585
1586 fn cancel_queries_in_session(&self, session_id: SessionId) {
1588 self.env.cancel_queries_in_session(session_id);
1589 }
1590
1591 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1592 self.env.cancel_creating_jobs_in_session(session_id);
1593 }
1594
1595 fn end_session(&self, session: &Self::Session) {
1596 self.delete_session(&session.session_id());
1597 }
1598
1599 async fn shutdown(&self) {
1600 self.env.sessions_map().write().clear();
1602 self.env.meta_client().try_unregister().await;
1604 }
1605}
1606
1607impl SessionManagerImpl {
1608 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1609 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1611 Ok(Self {
1612 env,
1613 _join_handles: join_handles,
1614 _shutdown_senders: shutdown_senders,
1615 number: AtomicI32::new(0),
1616 })
1617 }
1618
1619 pub(crate) fn env(&self) -> &FrontendEnv {
1620 &self.env
1621 }
1622
1623 fn insert_session(&self, session: Arc<SessionImpl>) {
1624 let active_sessions = {
1625 let mut write_guard = self.env.sessions_map.write();
1626 write_guard.insert(session.id(), session);
1627 write_guard.len()
1628 };
1629 self.env
1630 .frontend_metrics
1631 .active_sessions
1632 .set(active_sessions as i64);
1633 }
1634
1635 fn delete_session(&self, session_id: &SessionId) {
1636 let active_sessions = {
1637 let mut write_guard = self.env.sessions_map.write();
1638 write_guard.remove(session_id);
1639 write_guard.len()
1640 };
1641 self.env
1642 .frontend_metrics
1643 .active_sessions
1644 .set(active_sessions as i64);
1645 }
1646
1647 fn connect_inner(
1648 &self,
1649 database_id: DatabaseId,
1650 user_name: &str,
1651 peer_addr: AddressRef,
1652 ) -> Result<Arc<SessionImpl>> {
1653 let catalog_reader = self.env.catalog_reader();
1654 let reader = catalog_reader.read_guard();
1655 let (database_name, database_owner) = {
1656 let db = reader.get_database_by_id(database_id)?;
1657 (db.name(), db.owner())
1658 };
1659
1660 let user_reader = self.env.user_info_reader();
1661 let reader = user_reader.read_guard();
1662 if let Some(user) = reader.get_user_by_name(user_name) {
1663 if !user.can_login {
1664 bail_permission_denied!("User {} is not allowed to login", user_name);
1665 }
1666 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1667 if !user.is_super && database_owner != user.id && !has_privilege {
1668 bail_permission_denied!("User does not have CONNECT privilege.");
1669 }
1670
1671 let (connection_type, client_addr) = match peer_addr.as_ref() {
1673 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1674 Address::Unix(_) => (ConnectionType::Local, None),
1675 };
1676 tracing::debug!(
1677 "receive connection: type={:?}, client_addr={:?}",
1678 connection_type,
1679 client_addr
1680 );
1681
1682 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1683 &connection_type,
1684 database_name,
1685 user_name,
1686 client_addr,
1687 );
1688
1689 let Some(hba_entry_opt) = hba_entry_opt else {
1690 bail_permission_denied!(
1691 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1692 );
1693 };
1694
1695 let authenticator_by_info = || -> Result<UserAuthenticator> {
1697 let authenticator = match &user.auth_info {
1698 None => UserAuthenticator::None,
1699 Some(auth_info) => match auth_info.encryption_type() {
1700 EncryptionType::Plaintext => {
1701 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1702 }
1703 EncryptionType::Md5 => {
1704 let mut salt = [0; 4];
1705 let mut rng = rand::rng();
1706 rng.fill_bytes(&mut salt);
1707 UserAuthenticator::Md5WithSalt {
1708 encrypted_password: md5_hash_with_salt(
1709 &auth_info.encrypted_value,
1710 &salt,
1711 ),
1712 salt,
1713 }
1714 }
1715 EncryptionType::Oauth => UserAuthenticator::OAuth {
1716 metadata: auth_info.metadata.clone(),
1717 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1718 },
1719 _ => {
1720 bail_protocol_error!(
1721 "Unsupported auth type: {}",
1722 auth_info.encryption_type().as_str_name()
1723 );
1724 }
1725 },
1726 };
1727 Ok(authenticator)
1728 };
1729
1730 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1731 (AuthMethod::Trust, _) => UserAuthenticator::None,
1732 (AuthMethod::Password, _) => authenticator_by_info()?,
1734 (AuthMethod::Md5, Some(auth_info))
1735 if auth_info.encryption_type() == EncryptionType::Md5 =>
1736 {
1737 authenticator_by_info()?
1738 }
1739 (AuthMethod::OAuth, Some(auth_info))
1740 if auth_info.encryption_type() == EncryptionType::Oauth =>
1741 {
1742 authenticator_by_info()?
1743 }
1744 (AuthMethod::Ldap, _) => {
1745 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1746 }
1747 _ => {
1748 bail_permission_denied!(
1749 "password authentication failed for user \"{user_name}\""
1750 );
1751 }
1752 };
1753
1754 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1756 let id = (secret_key, secret_key);
1758 let session_config = self.env.session_params_snapshot();
1760
1761 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1762 self.env.clone(),
1763 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1764 user_authenticator,
1765 id,
1766 peer_addr,
1767 session_config,
1768 )
1769 .into();
1770 self.insert_session(session_impl.clone());
1771
1772 Ok(session_impl)
1773 } else {
1774 bail_catalog_error!("Role {} does not exist", user_name);
1775 }
1776 }
1777}
1778
1779impl Session for SessionImpl {
1780 type Error = RwError;
1781 type Portal = Portal;
1782 type PreparedStatement = PrepareStatement;
1783 type ValuesStream = PgResponseStream;
1784
1785 async fn run_one_query(
1788 self: Arc<Self>,
1789 stmt: Statement,
1790 format: Format,
1791 ) -> Result<PgResponse<PgResponseStream>> {
1792 let string = stmt.to_string();
1793 let sql_str = string.as_str();
1794 let sql: Arc<str> = Arc::from(sql_str);
1795 drop(string);
1797 let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1798 Ok(rsp)
1799 }
1800
1801 fn user_authenticator(&self) -> &UserAuthenticator {
1802 &self.user_authenticator
1803 }
1804
1805 fn id(&self) -> SessionId {
1806 self.id
1807 }
1808
1809 async fn parse(
1810 self: Arc<Self>,
1811 statement: Option<Statement>,
1812 params_types: Vec<Option<DataType>>,
1813 ) -> Result<PrepareStatement> {
1814 Ok(if let Some(statement) = statement {
1815 handle_parse(self, statement, params_types).await?
1816 } else {
1817 PrepareStatement::Empty
1818 })
1819 }
1820
1821 fn bind(
1822 self: Arc<Self>,
1823 prepare_statement: PrepareStatement,
1824 params: Vec<Option<Bytes>>,
1825 param_formats: Vec<Format>,
1826 result_formats: Vec<Format>,
1827 ) -> Result<Portal> {
1828 handle_bind(prepare_statement, params, param_formats, result_formats)
1829 }
1830
1831 async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1832 let rsp = Box::pin(handle_execute(self, portal)).await?;
1833 Ok(rsp)
1834 }
1835
1836 fn describe_statement(
1837 self: Arc<Self>,
1838 prepare_statement: PrepareStatement,
1839 ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1840 Ok(match prepare_statement {
1841 PrepareStatement::Empty => (vec![], vec![]),
1842 PrepareStatement::Prepared(prepare_statement) => (
1843 prepare_statement.bound_result.param_types,
1844 infer(
1845 Some(prepare_statement.bound_result.bound),
1846 prepare_statement.statement,
1847 )?,
1848 ),
1849 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1850 })
1851 }
1852
1853 fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1854 match portal {
1855 Portal::Empty => Ok(vec![]),
1856 Portal::Portal(portal) => {
1857 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1858 let formats = FormatIterator::new(&portal.result_formats, columns.len())
1859 .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1860 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1861 if f == Format::Binary {
1862 c.set_to_binary()
1863 }
1864 });
1865 Ok(columns)
1866 }
1867 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1868 }
1869 }
1870
1871 fn get_config(&self, key: &str) -> Result<String> {
1872 self.config().get(key).map_err(Into::into)
1873 }
1874
1875 fn set_config(&self, key: &str, value: String) -> Result<String> {
1876 Self::set_config(self, key, value)
1877 }
1878
1879 async fn next_notice(self: &Arc<Self>) -> String {
1880 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1881 .await
1882 .expect("notice channel should not be closed")
1883 }
1884
1885 fn transaction_status(&self) -> TransactionStatus {
1886 match &*self.txn.lock() {
1887 transaction::State::Initial | transaction::State::Implicit(_) => {
1888 TransactionStatus::Idle
1889 }
1890 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1891 }
1893 }
1894
1895 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1897 let exec_context = Arc::new(ExecContext {
1898 running_sql: sql,
1899 last_instant: Instant::now(),
1900 last_idle_instant: self.last_idle_instant.clone(),
1901 });
1902 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1903 *self.last_idle_instant.lock() = None;
1905 ExecContextGuard::new(exec_context)
1906 }
1907
1908 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1911 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1913 let idle_in_transaction_session_timeout =
1914 self.config().idle_in_transaction_session_timeout() as u128;
1915 if idle_in_transaction_session_timeout != 0 {
1917 let guard = self.exec_context.lock();
1919 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1921 if let Some(elapse_since_last_idle_instant) =
1923 self.elapse_since_last_idle_instant()
1924 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1925 {
1926 return Err(PsqlError::IdleInTxnTimeout);
1927 }
1928 }
1929 }
1930 }
1931 Ok(())
1932 }
1933
1934 fn user(&self) -> String {
1935 self.user_name()
1936 }
1937}
1938
1939fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1941 match stmt {
1942 Statement::Query(_)
1943 | Statement::Insert { .. }
1944 | Statement::Delete { .. }
1945 | Statement::Update { .. }
1946 | Statement::FetchCursor { .. } => Ok(bound
1947 .unwrap()
1948 .output_fields()
1949 .iter()
1950 .map(to_pg_field)
1951 .collect()),
1952 Statement::ShowObjects {
1953 object: show_object,
1954 ..
1955 } => Ok(infer_show_object(&show_object)),
1956 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1957 Statement::ShowTransactionIsolationLevel => {
1958 let name = "transaction_isolation";
1959 Ok(infer_show_variable(name))
1960 }
1961 Statement::ShowVariable { variable } => {
1962 let name = &variable[0].real_value().to_lowercase();
1963 Ok(infer_show_variable(name))
1964 }
1965 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1966 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1967 "QUERY PLAN".to_owned(),
1968 DataType::Varchar.to_oid(),
1969 DataType::Varchar.type_len(),
1970 )]),
1971 _ => Ok(vec![]),
1972 }
1973}
1974
1975pub struct WorkerProcessId {
1976 pub worker_id: WorkerId,
1977 pub process_id: i32,
1978}
1979
1980impl WorkerProcessId {
1981 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1982 Self {
1983 worker_id,
1984 process_id,
1985 }
1986 }
1987}
1988
1989impl Display for WorkerProcessId {
1990 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1991 write!(f, "{}:{}", self.worker_id, self.process_id)
1992 }
1993}
1994
1995impl TryFrom<String> for WorkerProcessId {
1996 type Error = String;
1997
1998 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1999 const INVALID: &str = "invalid WorkerProcessId";
2000 let splits: Vec<&str> = worker_process_id.split(":").collect();
2001 if splits.len() != 2 {
2002 return Err(INVALID.to_owned());
2003 }
2004 let Ok(worker_id) = splits[0].parse::<u32>() else {
2005 return Err(INVALID.to_owned());
2006 };
2007 let Ok(process_id) = splits[1].parse::<i32>() else {
2008 return Err(INVALID.to_owned());
2009 };
2010 Ok(WorkerProcessId::new(worker_id.into(), process_id))
2011 }
2012}
2013
2014pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2015 let guard = sessions_map.read();
2016 if let Some(session) = guard.get(&session_id) {
2017 session.cancel_current_query();
2018 true
2019 } else {
2020 info!("Current session finished, ignoring cancel query request");
2021 false
2022 }
2023}
2024
2025pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2026 let guard = sessions_map.read();
2027 if let Some(session) = guard.get(&session_id) {
2028 session.cancel_current_creating_job();
2029 true
2030 } else {
2031 info!("Current session finished, ignoring cancel creating request");
2032 false
2033 }
2034}