1use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34 BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35 UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44 WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49 DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52 AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53 StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60 LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::user::auth_info::EncryptionType;
80use risingwave_rpc_client::{
81 ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
82};
83use risingwave_sqlparser::ast::{ObjectName, Statement};
84use risingwave_sqlparser::parser::Parser;
85use thiserror::Error;
86use thiserror_ext::AsReport;
87use tokio::runtime::Builder;
88use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
89use tokio::sync::oneshot::Sender;
90use tokio::sync::watch;
91use tokio::task::JoinHandle;
92use tracing::{error, info};
93
94use self::cursor_manager::CursorManager;
95use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
96use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
97use crate::catalog::connection_catalog::ConnectionCatalog;
98use crate::catalog::root_catalog::{Catalog, SchemaPath};
99use crate::catalog::secret_catalog::SecretCatalog;
100use crate::catalog::source_catalog::SourceCatalog;
101use crate::catalog::subscription_catalog::SubscriptionCatalog;
102use crate::catalog::{
103 CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, TableId, check_schema_writable,
104};
105use crate::error::{ErrorCode, Result, RwError};
106use crate::handler::describe::infer_describe;
107use crate::handler::extended_handle::{
108 Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
109};
110use crate::handler::privilege::ObjectCheckItem;
111use crate::handler::show::{infer_show_create_object, infer_show_object};
112use crate::handler::util::to_pg_field;
113use crate::handler::variable::infer_show_variable;
114use crate::handler::{RwPgResponse, handle};
115use crate::health_service::HealthServiceImpl;
116use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
117use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
118use crate::observer::FrontendObserverNode;
119use crate::rpc::FrontendServiceImpl;
120use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
121use crate::scheduler::{
122 DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
123 HummockSnapshotManagerRef, QueryManager,
124};
125use crate::telemetry::FrontendTelemetryCreator;
126use crate::user::UserId;
127use crate::user::user_authentication::md5_hash_with_salt;
128use crate::user::user_manager::UserInfoManager;
129use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
130use crate::{FrontendOpts, PgResponseStream, TableCatalog};
131
132pub(crate) mod current;
133pub(crate) mod cursor_manager;
134pub(crate) mod transaction;
135
136#[derive(Clone)]
138pub(crate) struct FrontendEnv {
139 meta_client: Arc<dyn FrontendMetaClient>,
142 catalog_writer: Arc<dyn CatalogWriter>,
143 catalog_reader: CatalogReader,
144 user_info_writer: Arc<dyn UserInfoWriter>,
145 user_info_reader: UserInfoReader,
146 worker_node_manager: WorkerNodeManagerRef,
147 query_manager: QueryManager,
148 hummock_snapshot_manager: HummockSnapshotManagerRef,
149 system_params_manager: LocalSystemParamsManagerRef,
150 session_params: Arc<RwLock<SessionConfig>>,
151
152 server_addr: HostAddr,
153 client_pool: ComputeClientPoolRef,
154 frontend_client_pool: FrontendClientPoolRef,
155
156 sessions_map: SessionMapRef,
160
161 pub frontend_metrics: Arc<FrontendMetrics>,
162
163 pub cursor_metrics: Arc<CursorMetrics>,
164
165 source_metrics: Arc<SourceMetrics>,
166
167 spill_metrics: Arc<BatchSpillMetrics>,
169
170 batch_config: BatchConfig,
171 frontend_config: FrontendConfig,
172 #[expect(dead_code)]
173 meta_config: MetaConfig,
174 streaming_config: StreamingConfig,
175 udf_config: UdfConfig,
176
177 creating_streaming_job_tracker: StreamingJobTrackerRef,
180
181 compute_runtime: Arc<BackgroundShutdownRuntime>,
184
185 mem_context: MemoryContext,
187
188 serverless_backfill_controller_addr: String,
190
191 prometheus_client: Option<PrometheusClient>,
193
194 prometheus_selector: String,
196}
197
198pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
200
201const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
203
204impl FrontendEnv {
205 pub fn mock() -> Self {
206 use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
207
208 let catalog = Arc::new(RwLock::new(Catalog::default()));
209 let meta_client = Arc::new(MockFrontendMetaClient {});
210 let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
211 let catalog_writer = Arc::new(MockCatalogWriter::new(
212 catalog.clone(),
213 hummock_snapshot_manager.clone(),
214 ));
215 let catalog_reader = CatalogReader::new(catalog);
216 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
217 let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
218 let user_info_reader = UserInfoReader::new(user_info_manager);
219 let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
220 let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
221 let compute_client_pool = Arc::new(ComputeClientPool::for_test());
222 let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
223 let query_manager = QueryManager::new(
224 worker_node_manager.clone(),
225 compute_client_pool,
226 catalog_reader.clone(),
227 Arc::new(DistributedQueryMetrics::for_test()),
228 None,
229 None,
230 );
231 let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
232 let client_pool = Arc::new(ComputeClientPool::for_test());
233 let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
234 let runtime = {
235 let mut builder = Builder::new_multi_thread();
236 if let Some(frontend_compute_runtime_worker_threads) =
237 load_config("", FrontendOpts::default())
238 .batch
239 .frontend_compute_runtime_worker_threads
240 {
241 builder.worker_threads(frontend_compute_runtime_worker_threads);
242 }
243 builder
244 .thread_name("rw-batch-local")
245 .enable_all()
246 .build()
247 .unwrap()
248 };
249 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
250 let sessions_map = Arc::new(RwLock::new(HashMap::new()));
251 Self {
252 meta_client,
253 catalog_writer,
254 catalog_reader,
255 user_info_writer,
256 user_info_reader,
257 worker_node_manager,
258 query_manager,
259 hummock_snapshot_manager,
260 system_params_manager,
261 session_params: Default::default(),
262 server_addr,
263 client_pool,
264 frontend_client_pool,
265 sessions_map,
266 frontend_metrics: Arc::new(FrontendMetrics::for_test()),
267 cursor_metrics: Arc::new(CursorMetrics::for_test()),
268 batch_config: BatchConfig::default(),
269 frontend_config: FrontendConfig::default(),
270 meta_config: MetaConfig::default(),
271 streaming_config: StreamingConfig::default(),
272 udf_config: UdfConfig::default(),
273 source_metrics: Arc::new(SourceMetrics::default()),
274 spill_metrics: BatchSpillMetrics::for_test(),
275 creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
276 compute_runtime,
277 mem_context: MemoryContext::none(),
278 serverless_backfill_controller_addr: Default::default(),
279 prometheus_client: None,
280 prometheus_selector: String::new(),
281 }
282 }
283
284 pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
285 let config = load_config(&opts.config_path, &opts);
286 info!("Starting frontend node");
287 info!("> config: {:?}", config);
288 info!(
289 "> debug assertions: {}",
290 if cfg!(debug_assertions) { "on" } else { "off" }
291 );
292 info!("> version: {} ({})", RW_VERSION, GIT_SHA);
293
294 let frontend_address: HostAddr = opts
295 .advertise_addr
296 .as_ref()
297 .unwrap_or_else(|| {
298 tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
299 &opts.listen_addr
300 })
301 .parse()
302 .unwrap();
303 info!("advertise addr is {}", frontend_address);
304
305 let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
306 let internal_rpc_host_addr = HostAddr {
307 host: frontend_address.host.clone(),
309 port: rpc_addr.port,
310 };
311 let (meta_client, system_params_reader) = MetaClient::register_new(
313 opts.meta_addr,
314 WorkerType::Frontend,
315 &frontend_address,
316 AddWorkerNodeProperty {
317 internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
318 ..Default::default()
319 },
320 Arc::new(config.meta.clone()),
321 )
322 .await;
323
324 let worker_id = meta_client.worker_id();
325 info!("Assigned worker node id {}", worker_id);
326
327 let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
328 meta_client.clone(),
329 Duration::from_millis(config.server.heartbeat_interval_ms as u64),
330 );
331 let mut join_handles = vec![heartbeat_join_handle];
332 let mut shutdown_senders = vec![heartbeat_shutdown_sender];
333
334 let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
335 let hummock_snapshot_manager =
336 Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
337
338 let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
339 let catalog = Arc::new(RwLock::new(Catalog::default()));
340 let catalog_writer = Arc::new(CatalogWriterImpl::new(
341 meta_client.clone(),
342 catalog_updated_rx.clone(),
343 hummock_snapshot_manager.clone(),
344 ));
345 let catalog_reader = CatalogReader::new(catalog.clone());
346
347 let worker_node_manager = Arc::new(WorkerNodeManager::new());
348
349 let compute_client_pool = Arc::new(ComputeClientPool::new(
350 config.batch_exchange_connection_pool_size(),
351 config.batch.developer.compute_client_config.clone(),
352 ));
353 let frontend_client_pool = Arc::new(FrontendClientPool::new(
354 1,
355 config.batch.developer.frontend_client_config.clone(),
356 ));
357 let query_manager = QueryManager::new(
358 worker_node_manager.clone(),
359 compute_client_pool.clone(),
360 catalog_reader.clone(),
361 Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
362 config.batch.distributed_query_limit,
363 config.batch.max_batch_queries_per_frontend_node,
364 );
365
366 let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
367 let user_info_reader = UserInfoReader::new(user_info_manager.clone());
368 let user_info_writer = Arc::new(UserInfoWriterImpl::new(
369 meta_client.clone(),
370 catalog_updated_rx,
371 ));
372
373 let system_params_manager =
374 Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
375
376 LocalSecretManager::init(
377 opts.temp_secret_file_dir,
378 meta_client.cluster_id().to_owned(),
379 worker_id,
380 );
381
382 let session_params = Arc::new(RwLock::new(SessionConfig::default()));
384 let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
385 let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
386
387 let frontend_observer_node = FrontendObserverNode::new(
388 worker_node_manager.clone(),
389 catalog,
390 catalog_updated_tx,
391 user_info_manager,
392 hummock_snapshot_manager.clone(),
393 system_params_manager.clone(),
394 session_params.clone(),
395 compute_client_pool.clone(),
396 );
397 let observer_manager =
398 ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
399 .await;
400 let observer_join_handle = observer_manager.start().await;
401 join_handles.push(observer_join_handle);
402
403 meta_client.activate(&frontend_address).await?;
404
405 let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
406 let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
407 let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
408
409 if config.server.metrics_level > MetricLevel::Disabled {
410 MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
411 }
412
413 let health_srv = HealthServiceImpl::new();
414 let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
415 let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
416
417 let telemetry_manager = TelemetryManager::new(
418 Arc::new(meta_client.clone()),
419 Arc::new(FrontendTelemetryCreator::new()),
420 );
421
422 if config.server.telemetry_enabled && telemetry_env_enabled() {
425 let (join_handle, shutdown_sender) = telemetry_manager.start().await;
426 join_handles.push(join_handle);
427 shutdown_senders.push(shutdown_sender);
428 } else {
429 tracing::info!("Telemetry didn't start due to config");
430 }
431
432 tokio::spawn(async move {
433 tonic::transport::Server::builder()
434 .add_service(HealthServer::new(health_srv))
435 .add_service(FrontendServiceServer::new(frontend_srv))
436 .serve(frontend_rpc_addr)
437 .await
438 .unwrap();
439 });
440 info!(
441 "Health Check RPC Listener is set up on {}",
442 opts.frontend_rpc_listener_addr.clone()
443 );
444
445 let creating_streaming_job_tracker =
446 Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
447
448 let runtime = {
449 let mut builder = Builder::new_multi_thread();
450 if let Some(frontend_compute_runtime_worker_threads) =
451 config.batch.frontend_compute_runtime_worker_threads
452 {
453 builder.worker_threads(frontend_compute_runtime_worker_threads);
454 }
455 builder
456 .thread_name("rw-batch-local")
457 .enable_all()
458 .build()
459 .unwrap()
460 };
461 let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
462
463 let sessions = sessions_map.clone();
464 let join_handle = tokio::spawn(async move {
466 let mut check_idle_txn_interval =
467 tokio::time::interval(core::time::Duration::from_secs(10));
468 check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469 check_idle_txn_interval.reset();
470 loop {
471 check_idle_txn_interval.tick().await;
472 sessions.read().values().for_each(|session| {
473 let _ = session.check_idle_in_transaction_timeout();
474 })
475 }
476 });
477 join_handles.push(join_handle);
478
479 #[cfg(not(madsim))]
481 if config.batch.enable_spill {
482 SpillOp::clean_spill_directory()
483 .await
484 .map_err(|err| anyhow!(err))?;
485 }
486
487 let total_memory_bytes = opts.frontend_total_memory_bytes;
488 let heap_profiler =
489 HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
490 heap_profiler.start();
492
493 let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
494 let mem_context = MemoryContext::root(
495 frontend_metrics.batch_total_mem.clone(),
496 batch_memory_limit as u64,
497 );
498
499 let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
501 match PrometheusClient::try_from(endpoint.as_str()) {
502 Ok(client) => {
503 info!("Prometheus client initialized with endpoint: {}", endpoint);
504 Some(client)
505 }
506 Err(e) => {
507 error!(
508 error = %e.as_report(),
509 "failed to initialize Prometheus client",
510 );
511 None
512 }
513 }
514 } else {
515 None
516 };
517
518 let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
520
521 info!(
522 "Frontend total_memory: {} batch_memory: {}",
523 convert(total_memory_bytes as _),
524 convert(batch_memory_limit as _),
525 );
526
527 Ok((
528 Self {
529 catalog_reader,
530 catalog_writer,
531 user_info_reader,
532 user_info_writer,
533 worker_node_manager,
534 meta_client: frontend_meta_client,
535 query_manager,
536 hummock_snapshot_manager,
537 system_params_manager,
538 session_params,
539 server_addr: frontend_address,
540 client_pool: compute_client_pool,
541 frontend_client_pool,
542 frontend_metrics,
543 cursor_metrics,
544 spill_metrics,
545 sessions_map,
546 batch_config: config.batch,
547 frontend_config: config.frontend,
548 meta_config: config.meta,
549 streaming_config: config.streaming,
550 serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
551 udf_config: config.udf,
552 source_metrics,
553 creating_streaming_job_tracker,
554 compute_runtime,
555 mem_context,
556 prometheus_client,
557 prometheus_selector,
558 },
559 join_handles,
560 shutdown_senders,
561 ))
562 }
563
564 fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
569 &*self.catalog_writer
570 }
571
572 pub fn catalog_reader(&self) -> &CatalogReader {
574 &self.catalog_reader
575 }
576
577 fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
582 &*self.user_info_writer
583 }
584
585 pub fn user_info_reader(&self) -> &UserInfoReader {
587 &self.user_info_reader
588 }
589
590 pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
591 self.worker_node_manager.clone()
592 }
593
594 pub fn meta_client(&self) -> &dyn FrontendMetaClient {
595 &*self.meta_client
596 }
597
598 pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
599 self.meta_client.clone()
600 }
601
602 pub fn query_manager(&self) -> &QueryManager {
603 &self.query_manager
604 }
605
606 pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
607 &self.hummock_snapshot_manager
608 }
609
610 pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
611 &self.system_params_manager
612 }
613
614 pub fn session_params_snapshot(&self) -> SessionConfig {
615 self.session_params.read_recursive().clone()
616 }
617
618 pub fn sbc_address(&self) -> &String {
619 &self.serverless_backfill_controller_addr
620 }
621
622 pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
624 self.prometheus_client.as_ref()
625 }
626
627 pub fn prometheus_selector(&self) -> &str {
629 &self.prometheus_selector
630 }
631
632 pub fn server_address(&self) -> &HostAddr {
633 &self.server_addr
634 }
635
636 pub fn client_pool(&self) -> ComputeClientPoolRef {
637 self.client_pool.clone()
638 }
639
640 pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
641 self.frontend_client_pool.clone()
642 }
643
644 pub fn batch_config(&self) -> &BatchConfig {
645 &self.batch_config
646 }
647
648 pub fn frontend_config(&self) -> &FrontendConfig {
649 &self.frontend_config
650 }
651
652 pub fn streaming_config(&self) -> &StreamingConfig {
653 &self.streaming_config
654 }
655
656 pub fn udf_config(&self) -> &UdfConfig {
657 &self.udf_config
658 }
659
660 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
661 self.source_metrics.clone()
662 }
663
664 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
665 self.spill_metrics.clone()
666 }
667
668 pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
669 &self.creating_streaming_job_tracker
670 }
671
672 pub fn sessions_map(&self) -> &SessionMapRef {
673 &self.sessions_map
674 }
675
676 pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
677 self.compute_runtime.clone()
678 }
679
680 pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
683 cancel_queries_in_session(session_id, self.sessions_map.clone())
684 }
685
686 pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
689 cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
690 }
691
692 pub fn mem_context(&self) -> MemoryContext {
693 self.mem_context.clone()
694 }
695}
696
697#[derive(Clone)]
698pub struct AuthContext {
699 pub database: String,
700 pub user_name: String,
701 pub user_id: UserId,
702}
703
704impl AuthContext {
705 pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
706 Self {
707 database,
708 user_name,
709 user_id,
710 }
711 }
712}
713pub struct SessionImpl {
714 env: FrontendEnv,
715 auth_context: Arc<RwLock<AuthContext>>,
716 user_authenticator: UserAuthenticator,
718 config_map: Arc<RwLock<SessionConfig>>,
720
721 notice_tx: UnboundedSender<String>,
723 notice_rx: Mutex<UnboundedReceiver<String>>,
725
726 id: (i32, i32),
728
729 peer_addr: AddressRef,
731
732 txn: Arc<Mutex<transaction::State>>,
736
737 current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
741
742 exec_context: Mutex<Option<Weak<ExecContext>>>,
744
745 last_idle_instant: Arc<Mutex<Option<Instant>>>,
747
748 cursor_manager: Arc<CursorManager>,
749
750 temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
752
753 staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
755}
756
757#[derive(Default, Clone)]
766pub struct TemporarySourceManager {
767 sources: HashMap<String, SourceCatalog>,
768}
769
770impl TemporarySourceManager {
771 pub fn new() -> Self {
772 Self {
773 sources: HashMap::new(),
774 }
775 }
776
777 pub fn create_source(&mut self, name: String, source: SourceCatalog) {
778 self.sources.insert(name, source);
779 }
780
781 pub fn drop_source(&mut self, name: &str) {
782 self.sources.remove(name);
783 }
784
785 pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
786 self.sources.get(name)
787 }
788
789 pub fn keys(&self) -> Vec<String> {
790 self.sources.keys().cloned().collect()
791 }
792}
793
794#[derive(Default, Clone)]
796pub struct StagingCatalogManager {
797 tables: HashMap<String, TableCatalog>,
799}
800
801impl StagingCatalogManager {
802 pub fn new() -> Self {
803 Self {
804 tables: HashMap::new(),
805 }
806 }
807
808 pub fn create_table(&mut self, name: String, table: TableCatalog) {
809 self.tables.insert(name, table);
810 }
811
812 pub fn drop_table(&mut self, name: &str) {
813 self.tables.remove(name);
814 }
815
816 pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
817 self.tables.get(name)
818 }
819}
820
821#[derive(Error, Debug)]
822pub enum CheckRelationError {
823 #[error("{0}")]
824 Resolve(#[from] ResolveQualifiedNameError),
825 #[error("{0}")]
826 Catalog(#[from] CatalogError),
827}
828
829impl From<CheckRelationError> for RwError {
830 fn from(e: CheckRelationError) -> Self {
831 match e {
832 CheckRelationError::Resolve(e) => e.into(),
833 CheckRelationError::Catalog(e) => e.into(),
834 }
835 }
836}
837
838impl SessionImpl {
839 pub(crate) fn new(
840 env: FrontendEnv,
841 auth_context: AuthContext,
842 user_authenticator: UserAuthenticator,
843 id: SessionId,
844 peer_addr: AddressRef,
845 session_config: SessionConfig,
846 ) -> Self {
847 let cursor_metrics = env.cursor_metrics.clone();
848 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
849
850 Self {
851 env,
852 auth_context: Arc::new(RwLock::new(auth_context)),
853 user_authenticator,
854 config_map: Arc::new(RwLock::new(session_config)),
855 id,
856 peer_addr,
857 txn: Default::default(),
858 current_query_cancel_flag: Mutex::new(None),
859 notice_tx,
860 notice_rx: Mutex::new(notice_rx),
861 exec_context: Mutex::new(None),
862 last_idle_instant: Default::default(),
863 cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
864 temporary_source_manager: Default::default(),
865 staging_catalog_manager: Default::default(),
866 }
867 }
868
869 #[cfg(test)]
870 pub fn mock() -> Self {
871 let env = FrontendEnv::mock();
872 let (notice_tx, notice_rx) = mpsc::unbounded_channel();
873
874 Self {
875 env: FrontendEnv::mock(),
876 auth_context: Arc::new(RwLock::new(AuthContext::new(
877 DEFAULT_DATABASE_NAME.to_owned(),
878 DEFAULT_SUPER_USER.to_owned(),
879 DEFAULT_SUPER_USER_ID,
880 ))),
881 user_authenticator: UserAuthenticator::None,
882 config_map: Default::default(),
883 id: (0, 0),
885 txn: Default::default(),
886 current_query_cancel_flag: Mutex::new(None),
887 notice_tx,
888 notice_rx: Mutex::new(notice_rx),
889 exec_context: Mutex::new(None),
890 peer_addr: Address::Tcp(SocketAddr::new(
891 IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
892 8080,
893 ))
894 .into(),
895 last_idle_instant: Default::default(),
896 cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
897 temporary_source_manager: Default::default(),
898 staging_catalog_manager: Default::default(),
899 }
900 }
901
902 pub(crate) fn env(&self) -> &FrontendEnv {
903 &self.env
904 }
905
906 pub fn auth_context(&self) -> Arc<AuthContext> {
907 let ctx = self.auth_context.read();
908 Arc::new(ctx.clone())
909 }
910
911 pub fn database(&self) -> String {
912 self.auth_context.read().database.clone()
913 }
914
915 pub fn database_id(&self) -> DatabaseId {
916 let db_name = self.database();
917 self.env
918 .catalog_reader()
919 .read_guard()
920 .get_database_by_name(&db_name)
921 .map(|db| db.id())
922 .expect("session database not found")
923 }
924
925 pub fn user_name(&self) -> String {
926 self.auth_context.read().user_name.clone()
927 }
928
929 pub fn user_id(&self) -> UserId {
930 self.auth_context.read().user_id
931 }
932
933 pub fn update_database(&self, database: String) {
934 self.auth_context.write().database = database;
935 }
936
937 pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
938 Arc::clone(&self.config_map)
939 }
940
941 pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
942 self.config_map.read()
943 }
944
945 pub fn set_config(&self, key: &str, value: String) -> Result<String> {
946 self.config_map
947 .write()
948 .set(key, value, &mut ())
949 .map_err(Into::into)
950 }
951
952 pub fn reset_config(&self, key: &str) -> Result<String> {
953 self.config_map
954 .write()
955 .reset(key, &mut ())
956 .map_err(Into::into)
957 }
958
959 pub fn set_config_report(
960 &self,
961 key: &str,
962 value: Option<String>,
963 mut reporter: impl ConfigReporter,
964 ) -> Result<String> {
965 if let Some(value) = value {
966 self.config_map
967 .write()
968 .set(key, value, &mut reporter)
969 .map_err(Into::into)
970 } else {
971 self.config_map
972 .write()
973 .reset(key, &mut reporter)
974 .map_err(Into::into)
975 }
976 }
977
978 pub fn session_id(&self) -> SessionId {
979 self.id
980 }
981
982 pub fn running_sql(&self) -> Option<Arc<str>> {
983 self.exec_context
984 .lock()
985 .as_ref()
986 .and_then(|weak| weak.upgrade())
987 .map(|context| context.running_sql.clone())
988 }
989
990 pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
991 self.cursor_manager.clone()
992 }
993
994 pub fn peer_addr(&self) -> &Address {
995 &self.peer_addr
996 }
997
998 pub fn elapse_since_running_sql(&self) -> Option<u128> {
999 self.exec_context
1000 .lock()
1001 .as_ref()
1002 .and_then(|weak| weak.upgrade())
1003 .map(|context| context.last_instant.elapsed().as_millis())
1004 }
1005
1006 pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1007 self.last_idle_instant
1008 .lock()
1009 .as_ref()
1010 .map(|x| x.elapsed().as_millis())
1011 }
1012
1013 pub fn check_relation_name_duplicated(
1014 &self,
1015 name: ObjectName,
1016 stmt_type: StatementType,
1017 if_not_exists: bool,
1018 ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1019 let db_name = &self.database();
1020 let catalog_reader = self.env().catalog_reader().read_guard();
1021 let (schema_name, relation_name) = {
1022 let (schema_name, relation_name) =
1023 Binder::resolve_schema_qualified_name(db_name, &name)?;
1024 let search_path = self.config().search_path();
1025 let user_name = &self.user_name();
1026 let schema_name = match schema_name {
1027 Some(schema_name) => schema_name,
1028 None => catalog_reader
1029 .first_valid_schema(db_name, &search_path, user_name)?
1030 .name(),
1031 };
1032 (schema_name, relation_name)
1033 };
1034 match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1035 Err(CatalogError::Duplicated(_, name, is_creating)) if if_not_exists => {
1036 if !is_creating {
1041 Ok(Either::Right(
1042 PgResponse::builder(stmt_type)
1043 .notice(format!("relation \"{}\" already exists, skipping", name))
1044 .into(),
1045 ))
1046 } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1047 Ok(Either::Right(
1050 PgResponse::builder(stmt_type)
1051 .notice(format!(
1052 "relation \"{}\" already exists but still creating, skipping",
1053 name
1054 ))
1055 .into(),
1056 ))
1057 } else {
1058 Ok(Either::Left(()))
1059 }
1060 }
1061 Err(e) => Err(e.into()),
1062 Ok(_) => Ok(Either::Left(())),
1063 }
1064 }
1065
1066 pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1067 let db_name = &self.database();
1068 let catalog_reader = self.env().catalog_reader().read_guard();
1069 let (schema_name, secret_name) = {
1070 let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1071 let search_path = self.config().search_path();
1072 let user_name = &self.user_name();
1073 let schema_name = match schema_name {
1074 Some(schema_name) => schema_name,
1075 None => catalog_reader
1076 .first_valid_schema(db_name, &search_path, user_name)?
1077 .name(),
1078 };
1079 (schema_name, secret_name)
1080 };
1081 catalog_reader
1082 .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1083 .map_err(RwError::from)
1084 }
1085
1086 pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1087 let db_name = &self.database();
1088 let catalog_reader = self.env().catalog_reader().read_guard();
1089 let (schema_name, connection_name) = {
1090 let (schema_name, connection_name) =
1091 Binder::resolve_schema_qualified_name(db_name, &name)?;
1092 let search_path = self.config().search_path();
1093 let user_name = &self.user_name();
1094 let schema_name = match schema_name {
1095 Some(schema_name) => schema_name,
1096 None => catalog_reader
1097 .first_valid_schema(db_name, &search_path, user_name)?
1098 .name(),
1099 };
1100 (schema_name, connection_name)
1101 };
1102 catalog_reader
1103 .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1104 .map_err(RwError::from)
1105 }
1106
1107 pub fn check_function_name_duplicated(
1108 &self,
1109 stmt_type: StatementType,
1110 name: ObjectName,
1111 arg_types: &[DataType],
1112 if_not_exists: bool,
1113 ) -> Result<Either<(), RwPgResponse>> {
1114 let db_name = &self.database();
1115 let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1116 let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1117
1118 let catalog_reader = self.env().catalog_reader().read_guard();
1119 if catalog_reader
1120 .get_schema_by_id(database_id, schema_id)?
1121 .get_function_by_name_args(&function_name, arg_types)
1122 .is_some()
1123 {
1124 let full_name = format!(
1125 "{function_name}({})",
1126 arg_types.iter().map(|t| t.to_string()).join(",")
1127 );
1128 if if_not_exists {
1129 Ok(Either::Right(
1130 PgResponse::builder(stmt_type)
1131 .notice(format!(
1132 "function \"{}\" already exists, skipping",
1133 full_name
1134 ))
1135 .into(),
1136 ))
1137 } else {
1138 Err(CatalogError::duplicated("function", full_name).into())
1139 }
1140 } else {
1141 Ok(Either::Left(()))
1142 }
1143 }
1144
1145 pub fn get_database_and_schema_id_for_create(
1147 &self,
1148 schema_name: Option<String>,
1149 ) -> Result<(DatabaseId, SchemaId)> {
1150 let db_name = &self.database();
1151
1152 let search_path = self.config().search_path();
1153 let user_name = &self.user_name();
1154
1155 let catalog_reader = self.env().catalog_reader().read_guard();
1156 let schema = match schema_name {
1157 Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1158 None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1159 };
1160 let schema_name = schema.name();
1161
1162 check_schema_writable(&schema_name)?;
1163 self.check_privileges(&[ObjectCheckItem::new(
1164 schema.owner(),
1165 AclMode::Create,
1166 schema_name,
1167 schema.id(),
1168 )])?;
1169
1170 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1171 Ok((db_id, schema.id()))
1172 }
1173
1174 pub fn get_connection_by_name(
1175 &self,
1176 schema_name: Option<String>,
1177 connection_name: &str,
1178 ) -> Result<Arc<ConnectionCatalog>> {
1179 let db_name = &self.database();
1180 let search_path = self.config().search_path();
1181 let user_name = &self.user_name();
1182
1183 let catalog_reader = self.env().catalog_reader().read_guard();
1184 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1185 let (connection, _) =
1186 catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1187
1188 self.check_privileges(&[ObjectCheckItem::new(
1189 connection.owner(),
1190 AclMode::Usage,
1191 connection.name.clone(),
1192 connection.id,
1193 )])?;
1194
1195 Ok(connection.clone())
1196 }
1197
1198 pub fn get_subscription_by_schema_id_name(
1199 &self,
1200 schema_id: SchemaId,
1201 subscription_name: &str,
1202 ) -> Result<Arc<SubscriptionCatalog>> {
1203 let db_name = &self.database();
1204
1205 let catalog_reader = self.env().catalog_reader().read_guard();
1206 let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1207 let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1208 let subscription = schema
1209 .get_subscription_by_name(subscription_name)
1210 .ok_or_else(|| {
1211 RwError::from(ErrorCode::ItemNotFound(format!(
1212 "subscription {} not found",
1213 subscription_name
1214 )))
1215 })?;
1216 Ok(subscription.clone())
1217 }
1218
1219 pub fn get_subscription_by_name(
1220 &self,
1221 schema_name: Option<String>,
1222 subscription_name: &str,
1223 ) -> Result<Arc<SubscriptionCatalog>> {
1224 let db_name = &self.database();
1225 let search_path = self.config().search_path();
1226 let user_name = &self.user_name();
1227
1228 let catalog_reader = self.env().catalog_reader().read_guard();
1229 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1230 let (subscription, _) =
1231 catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1232 Ok(subscription.clone())
1233 }
1234
1235 pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1236 let catalog_reader = self.env().catalog_reader().read_guard();
1237 Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1238 }
1239
1240 pub fn get_table_by_name(
1241 &self,
1242 table_name: &str,
1243 db_id: DatabaseId,
1244 schema_id: SchemaId,
1245 ) -> Result<Arc<TableCatalog>> {
1246 let catalog_reader = self.env().catalog_reader().read_guard();
1247 let table = catalog_reader
1248 .get_schema_by_id(db_id, schema_id)?
1249 .get_created_table_by_name(table_name)
1250 .ok_or_else(|| {
1251 Error::new(
1252 ErrorKind::InvalidInput,
1253 format!("table \"{}\" does not exist", table_name),
1254 )
1255 })?;
1256
1257 self.check_privileges(&[ObjectCheckItem::new(
1258 table.owner(),
1259 AclMode::Select,
1260 table_name.to_owned(),
1261 table.id,
1262 )])?;
1263
1264 Ok(table.clone())
1265 }
1266
1267 pub fn get_secret_by_name(
1268 &self,
1269 schema_name: Option<String>,
1270 secret_name: &str,
1271 ) -> Result<Arc<SecretCatalog>> {
1272 let db_name = &self.database();
1273 let search_path = self.config().search_path();
1274 let user_name = &self.user_name();
1275
1276 let catalog_reader = self.env().catalog_reader().read_guard();
1277 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1278 let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1279
1280 self.check_privileges(&[ObjectCheckItem::new(
1281 secret.owner(),
1282 AclMode::Usage,
1283 secret.name.clone(),
1284 secret.id,
1285 )])?;
1286
1287 Ok(secret.clone())
1288 }
1289
1290 pub fn list_change_log_epochs(
1291 &self,
1292 table_id: TableId,
1293 min_epoch: u64,
1294 max_count: u32,
1295 ) -> Result<Vec<u64>> {
1296 Ok(self
1297 .env
1298 .hummock_snapshot_manager()
1299 .acquire()
1300 .list_change_log_epochs(table_id, min_epoch, max_count))
1301 }
1302
1303 pub fn clear_cancel_query_flag(&self) {
1304 let mut flag = self.current_query_cancel_flag.lock();
1305 *flag = None;
1306 }
1307
1308 pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1309 let mut flag = self.current_query_cancel_flag.lock();
1310 let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1311 *flag = Some(shutdown_tx);
1312 shutdown_rx
1313 }
1314
1315 pub fn cancel_current_query(&self) {
1316 let mut flag_guard = self.current_query_cancel_flag.lock();
1317 if let Some(sender) = flag_guard.take() {
1318 info!("Trying to cancel query in local mode.");
1319 sender.cancel();
1321 info!("Cancel query request sent.");
1322 } else {
1323 info!("Trying to cancel query in distributed mode.");
1324 self.env.query_manager().cancel_queries_in_session(self.id)
1325 }
1326 }
1327
1328 pub fn cancel_current_creating_job(&self) {
1329 self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1330 }
1331
1332 pub async fn run_statement(
1335 self: Arc<Self>,
1336 sql: Arc<str>,
1337 formats: Vec<Format>,
1338 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1339 let mut stmts = Parser::parse_sql(&sql)?;
1341 if stmts.is_empty() {
1342 return Ok(PgResponse::empty_result(
1343 pgwire::pg_response::StatementType::EMPTY,
1344 ));
1345 }
1346 if stmts.len() > 1 {
1347 return Ok(
1348 PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1349 .notice("cannot insert multiple commands into statement")
1350 .into(),
1351 );
1352 }
1353 let stmt = stmts.swap_remove(0);
1354 let rsp = handle(self, stmt, sql.clone(), formats).await?;
1355 Ok(rsp)
1356 }
1357
1358 pub fn notice_to_user(&self, str: impl Into<String>) {
1359 let notice = str.into();
1360 tracing::trace!(notice, "notice to user");
1361 self.notice_tx
1362 .send(notice)
1363 .expect("notice channel should not be closed");
1364 }
1365
1366 pub fn is_barrier_read(&self) -> bool {
1367 match self.config().visibility_mode() {
1368 VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1369 VisibilityMode::All => true,
1370 VisibilityMode::Checkpoint => false,
1371 }
1372 }
1373
1374 pub fn statement_timeout(&self) -> Duration {
1375 if self.config().statement_timeout() == 0 {
1376 Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1377 } else {
1378 Duration::from_secs(self.config().statement_timeout() as u64)
1379 }
1380 }
1381
1382 pub fn create_temporary_source(&self, source: SourceCatalog) {
1383 self.temporary_source_manager
1384 .lock()
1385 .create_source(source.name.clone(), source);
1386 }
1387
1388 pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1389 self.temporary_source_manager
1390 .lock()
1391 .get_source(name)
1392 .cloned()
1393 }
1394
1395 pub fn drop_temporary_source(&self, name: &str) {
1396 self.temporary_source_manager.lock().drop_source(name);
1397 }
1398
1399 pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1400 self.temporary_source_manager.lock().clone()
1401 }
1402
1403 pub fn create_staging_table(&self, table: TableCatalog) {
1404 self.staging_catalog_manager
1405 .lock()
1406 .create_table(table.name.clone(), table);
1407 }
1408
1409 pub fn drop_staging_table(&self, name: &str) {
1410 self.staging_catalog_manager.lock().drop_table(name);
1411 }
1412
1413 pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1414 self.staging_catalog_manager.lock().clone()
1415 }
1416
1417 pub async fn check_cluster_limits(&self) -> Result<()> {
1418 if self.config().bypass_cluster_limits() {
1419 return Ok(());
1420 }
1421
1422 let gen_message = |ActorCountPerParallelism {
1423 worker_id_to_actor_count,
1424 hard_limit,
1425 soft_limit,
1426 }: ActorCountPerParallelism,
1427 exceed_hard_limit: bool|
1428 -> String {
1429 let (limit_type, action) = if exceed_hard_limit {
1430 ("critical", "Scale the cluster immediately to proceed.")
1431 } else {
1432 (
1433 "recommended",
1434 "Consider scaling the cluster for optimal performance.",
1435 )
1436 };
1437 format!(
1438 r#"Actor count per parallelism exceeds the {limit_type} limit.
1439
1440Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1441
1442HINT:
1443- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1444- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1445 See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1446- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1447
1448DETAILS:
1449- hard limit: {hard_limit}
1450- soft limit: {soft_limit}
1451- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1452 )
1453 };
1454
1455 let limits = self.env().meta_client().get_cluster_limits().await?;
1456 for limit in limits {
1457 match limit {
1458 cluster_limit::ClusterLimit::ActorCount(l) => {
1459 if l.exceed_hard_limit() {
1460 return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1461 l, true,
1462 ))));
1463 } else if l.exceed_soft_limit() {
1464 self.notice_to_user(gen_message(l, false));
1465 }
1466 }
1467 }
1468 }
1469 Ok(())
1470 }
1471}
1472
1473pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1474 std::sync::OnceLock::new();
1475
1476pub struct SessionManagerImpl {
1477 env: FrontendEnv,
1478 _join_handles: Vec<JoinHandle<()>>,
1479 _shutdown_senders: Vec<Sender<()>>,
1480 number: AtomicI32,
1481}
1482
1483impl SessionManager for SessionManagerImpl {
1484 type Session = SessionImpl;
1485
1486 fn create_dummy_session(
1487 &self,
1488 database_id: DatabaseId,
1489 user_id: u32,
1490 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
1491 let dummy_addr = Address::Tcp(SocketAddr::new(
1492 IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1493 5691, ));
1495 let user_reader = self.env.user_info_reader();
1496 let reader = user_reader.read_guard();
1497 if let Some(user_name) = reader.get_user_name_by_id(user_id) {
1498 self.connect_inner(database_id, user_name.as_str(), Arc::new(dummy_addr))
1499 } else {
1500 Err(Box::new(Error::new(
1501 ErrorKind::InvalidInput,
1502 format!("Role id {} does not exist", user_id),
1503 )))
1504 }
1505 }
1506
1507 fn connect(
1508 &self,
1509 database: &str,
1510 user_name: &str,
1511 peer_addr: AddressRef,
1512 ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
1513 let catalog_reader = self.env.catalog_reader();
1514 let reader = catalog_reader.read_guard();
1515 let database_id = reader
1516 .get_database_by_name(database)
1517 .map_err(|_| {
1518 Box::new(Error::new(
1519 ErrorKind::InvalidInput,
1520 format!("database \"{}\" does not exist", database),
1521 ))
1522 })?
1523 .id();
1524
1525 self.connect_inner(database_id, user_name, peer_addr)
1526 }
1527
1528 fn cancel_queries_in_session(&self, session_id: SessionId) {
1530 self.env.cancel_queries_in_session(session_id);
1531 }
1532
1533 fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1534 self.env.cancel_creating_jobs_in_session(session_id);
1535 }
1536
1537 fn end_session(&self, session: &Self::Session) {
1538 self.delete_session(&session.session_id());
1539 }
1540
1541 async fn shutdown(&self) {
1542 self.env.sessions_map().write().clear();
1544 self.env.meta_client().try_unregister().await;
1546 }
1547}
1548
1549impl SessionManagerImpl {
1550 pub async fn new(opts: FrontendOpts) -> Result<Self> {
1551 let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1553 Ok(Self {
1554 env,
1555 _join_handles: join_handles,
1556 _shutdown_senders: shutdown_senders,
1557 number: AtomicI32::new(0),
1558 })
1559 }
1560
1561 pub(crate) fn env(&self) -> &FrontendEnv {
1562 &self.env
1563 }
1564
1565 fn insert_session(&self, session: Arc<SessionImpl>) {
1566 let active_sessions = {
1567 let mut write_guard = self.env.sessions_map.write();
1568 write_guard.insert(session.id(), session);
1569 write_guard.len()
1570 };
1571 self.env
1572 .frontend_metrics
1573 .active_sessions
1574 .set(active_sessions as i64);
1575 }
1576
1577 fn delete_session(&self, session_id: &SessionId) {
1578 let active_sessions = {
1579 let mut write_guard = self.env.sessions_map.write();
1580 write_guard.remove(session_id);
1581 write_guard.len()
1582 };
1583 self.env
1584 .frontend_metrics
1585 .active_sessions
1586 .set(active_sessions as i64);
1587 }
1588
1589 fn connect_inner(
1590 &self,
1591 database_id: DatabaseId,
1592 user_name: &str,
1593 peer_addr: AddressRef,
1594 ) -> std::result::Result<Arc<SessionImpl>, BoxedError> {
1595 let catalog_reader = self.env.catalog_reader();
1596 let reader = catalog_reader.read_guard();
1597 let (database_name, database_owner) = {
1598 let db = reader.get_database_by_id(database_id).map_err(|_| {
1599 Box::new(Error::new(
1600 ErrorKind::InvalidInput,
1601 format!("database \"{}\" does not exist", database_id),
1602 ))
1603 })?;
1604 (db.name(), db.owner())
1605 };
1606
1607 let user_reader = self.env.user_info_reader();
1608 let reader = user_reader.read_guard();
1609 if let Some(user) = reader.get_user_by_name(user_name) {
1610 if !user.can_login {
1611 return Err(Box::new(Error::new(
1612 ErrorKind::InvalidInput,
1613 format!("User {} is not allowed to login", user_name),
1614 )));
1615 }
1616 let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1617 if !user.is_super && database_owner != user.id && !has_privilege {
1618 return Err(Box::new(Error::new(
1619 ErrorKind::PermissionDenied,
1620 "User does not have CONNECT privilege.",
1621 )));
1622 }
1623
1624 let (connection_type, client_addr) = match peer_addr.as_ref() {
1626 Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1627 Address::Unix(_) => (ConnectionType::Local, None),
1628 };
1629 tracing::debug!(
1630 "receive connection: type={:?}, client_addr={:?}",
1631 connection_type,
1632 client_addr
1633 );
1634
1635 let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1636 &connection_type,
1637 database_name,
1638 user_name,
1639 client_addr,
1640 );
1641
1642 let Some(hba_entry_opt) = hba_entry_opt else {
1644 return Err(Box::new(Error::new(
1645 ErrorKind::PermissionDenied,
1646 format!(
1647 "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1648 ),
1649 )));
1650 };
1651
1652 let authenticator_by_info = || -> std::result::Result<UserAuthenticator, BoxedError> {
1654 let authenticator = match &user.auth_info {
1655 None => UserAuthenticator::None,
1656 Some(auth_info) => match auth_info.encryption_type() {
1657 EncryptionType::Plaintext => {
1658 UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1659 }
1660 EncryptionType::Md5 => {
1661 let mut salt = [0; 4];
1662 let mut rng = rand::rng();
1663 rng.fill_bytes(&mut salt);
1664 UserAuthenticator::Md5WithSalt {
1665 encrypted_password: md5_hash_with_salt(
1666 &auth_info.encrypted_value,
1667 &salt,
1668 ),
1669 salt,
1670 }
1671 }
1672 EncryptionType::Oauth => UserAuthenticator::OAuth {
1673 metadata: auth_info.metadata.clone(),
1674 cluster_id: self.env.meta_client().cluster_id().to_owned(),
1675 },
1676 _ => {
1677 return Err(Box::new(Error::new(
1678 ErrorKind::Unsupported,
1679 format!(
1680 "Unsupported auth type: {}",
1681 auth_info.encryption_type().as_str_name()
1682 ),
1683 )));
1684 }
1685 },
1686 };
1687 Ok(authenticator)
1688 };
1689
1690 let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1691 (AuthMethod::Trust, _) => UserAuthenticator::None,
1692 (AuthMethod::Password, _) => authenticator_by_info()?,
1694 (AuthMethod::Md5, Some(auth_info))
1695 if auth_info.encryption_type() == EncryptionType::Md5 =>
1696 {
1697 authenticator_by_info()?
1698 }
1699 (AuthMethod::OAuth, Some(auth_info))
1700 if auth_info.encryption_type() == EncryptionType::Oauth =>
1701 {
1702 authenticator_by_info()?
1703 }
1704 (AuthMethod::Ldap, _) => {
1705 UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1706 }
1707 _ => {
1708 return Err(Box::new(Error::new(
1709 ErrorKind::PermissionDenied,
1710 format!("password authentication failed for user \"{user_name}\""),
1711 )));
1712 }
1713 };
1714
1715 let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1717 let id = (secret_key, secret_key);
1719 let session_config = self.env.session_params_snapshot();
1721
1722 let session_impl: Arc<SessionImpl> = SessionImpl::new(
1723 self.env.clone(),
1724 AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1725 user_authenticator,
1726 id,
1727 peer_addr,
1728 session_config,
1729 )
1730 .into();
1731 self.insert_session(session_impl.clone());
1732
1733 Ok(session_impl)
1734 } else {
1735 Err(Box::new(Error::new(
1736 ErrorKind::InvalidInput,
1737 format!("Role {} does not exist", user_name),
1738 )))
1739 }
1740 }
1741}
1742
1743impl Session for SessionImpl {
1744 type Portal = Portal;
1745 type PreparedStatement = PrepareStatement;
1746 type ValuesStream = PgResponseStream;
1747
1748 async fn run_one_query(
1751 self: Arc<Self>,
1752 stmt: Statement,
1753 format: Format,
1754 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1755 let string = stmt.to_string();
1756 let sql_str = string.as_str();
1757 let sql: Arc<str> = Arc::from(sql_str);
1758 drop(string);
1760 let rsp = handle(self, stmt, sql, vec![format]).await?;
1761 Ok(rsp)
1762 }
1763
1764 fn user_authenticator(&self) -> &UserAuthenticator {
1765 &self.user_authenticator
1766 }
1767
1768 fn id(&self) -> SessionId {
1769 self.id
1770 }
1771
1772 async fn parse(
1773 self: Arc<Self>,
1774 statement: Option<Statement>,
1775 params_types: Vec<Option<DataType>>,
1776 ) -> std::result::Result<PrepareStatement, BoxedError> {
1777 Ok(if let Some(statement) = statement {
1778 handle_parse(self, statement, params_types).await?
1779 } else {
1780 PrepareStatement::Empty
1781 })
1782 }
1783
1784 fn bind(
1785 self: Arc<Self>,
1786 prepare_statement: PrepareStatement,
1787 params: Vec<Option<Bytes>>,
1788 param_formats: Vec<Format>,
1789 result_formats: Vec<Format>,
1790 ) -> std::result::Result<Portal, BoxedError> {
1791 Ok(handle_bind(
1792 prepare_statement,
1793 params,
1794 param_formats,
1795 result_formats,
1796 )?)
1797 }
1798
1799 async fn execute(
1800 self: Arc<Self>,
1801 portal: Portal,
1802 ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1803 let rsp = handle_execute(self, portal).await?;
1804 Ok(rsp)
1805 }
1806
1807 fn describe_statement(
1808 self: Arc<Self>,
1809 prepare_statement: PrepareStatement,
1810 ) -> std::result::Result<(Vec<DataType>, Vec<PgFieldDescriptor>), BoxedError> {
1811 Ok(match prepare_statement {
1812 PrepareStatement::Empty => (vec![], vec![]),
1813 PrepareStatement::Prepared(prepare_statement) => (
1814 prepare_statement.bound_result.param_types,
1815 infer(
1816 Some(prepare_statement.bound_result.bound),
1817 prepare_statement.statement,
1818 )?,
1819 ),
1820 PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1821 })
1822 }
1823
1824 fn describe_portal(
1825 self: Arc<Self>,
1826 portal: Portal,
1827 ) -> std::result::Result<Vec<PgFieldDescriptor>, BoxedError> {
1828 match portal {
1829 Portal::Empty => Ok(vec![]),
1830 Portal::Portal(portal) => {
1831 let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1832 let formats = FormatIterator::new(&portal.result_formats, columns.len())?;
1833 columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1834 if f == Format::Binary {
1835 c.set_to_binary()
1836 }
1837 });
1838 Ok(columns)
1839 }
1840 Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1841 }
1842 }
1843
1844 fn get_config(&self, key: &str) -> std::result::Result<String, BoxedError> {
1845 self.config().get(key).map_err(Into::into)
1846 }
1847
1848 fn set_config(&self, key: &str, value: String) -> std::result::Result<String, BoxedError> {
1849 Self::set_config(self, key, value).map_err(Into::into)
1850 }
1851
1852 async fn next_notice(self: &Arc<Self>) -> String {
1853 std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1854 .await
1855 .expect("notice channel should not be closed")
1856 }
1857
1858 fn transaction_status(&self) -> TransactionStatus {
1859 match &*self.txn.lock() {
1860 transaction::State::Initial | transaction::State::Implicit(_) => {
1861 TransactionStatus::Idle
1862 }
1863 transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1864 }
1866 }
1867
1868 fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1870 let exec_context = Arc::new(ExecContext {
1871 running_sql: sql,
1872 last_instant: Instant::now(),
1873 last_idle_instant: self.last_idle_instant.clone(),
1874 });
1875 *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1876 *self.last_idle_instant.lock() = None;
1878 ExecContextGuard::new(exec_context)
1879 }
1880
1881 fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1884 if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1886 let idle_in_transaction_session_timeout =
1887 self.config().idle_in_transaction_session_timeout() as u128;
1888 if idle_in_transaction_session_timeout != 0 {
1890 let guard = self.exec_context.lock();
1892 if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1894 if let Some(elapse_since_last_idle_instant) =
1896 self.elapse_since_last_idle_instant()
1897 && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1898 {
1899 return Err(PsqlError::IdleInTxnTimeout);
1900 }
1901 }
1902 }
1903 }
1904 Ok(())
1905 }
1906}
1907
1908fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1910 match stmt {
1911 Statement::Query(_)
1912 | Statement::Insert { .. }
1913 | Statement::Delete { .. }
1914 | Statement::Update { .. }
1915 | Statement::FetchCursor { .. } => Ok(bound
1916 .unwrap()
1917 .output_fields()
1918 .iter()
1919 .map(to_pg_field)
1920 .collect()),
1921 Statement::ShowObjects {
1922 object: show_object,
1923 ..
1924 } => Ok(infer_show_object(&show_object)),
1925 Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1926 Statement::ShowTransactionIsolationLevel => {
1927 let name = "transaction_isolation";
1928 Ok(infer_show_variable(name))
1929 }
1930 Statement::ShowVariable { variable } => {
1931 let name = &variable[0].real_value().to_lowercase();
1932 Ok(infer_show_variable(name))
1933 }
1934 Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1935 Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1936 "QUERY PLAN".to_owned(),
1937 DataType::Varchar.to_oid(),
1938 DataType::Varchar.type_len(),
1939 )]),
1940 _ => Ok(vec![]),
1941 }
1942}
1943
1944pub struct WorkerProcessId {
1945 pub worker_id: WorkerId,
1946 pub process_id: i32,
1947}
1948
1949impl WorkerProcessId {
1950 pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1951 Self {
1952 worker_id,
1953 process_id,
1954 }
1955 }
1956}
1957
1958impl Display for WorkerProcessId {
1959 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1960 write!(f, "{}:{}", self.worker_id, self.process_id)
1961 }
1962}
1963
1964impl TryFrom<String> for WorkerProcessId {
1965 type Error = String;
1966
1967 fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1968 const INVALID: &str = "invalid WorkerProcessId";
1969 let splits: Vec<&str> = worker_process_id.split(":").collect();
1970 if splits.len() != 2 {
1971 return Err(INVALID.to_owned());
1972 }
1973 let Ok(worker_id) = splits[0].parse::<u32>() else {
1974 return Err(INVALID.to_owned());
1975 };
1976 let Ok(process_id) = splits[1].parse::<i32>() else {
1977 return Err(INVALID.to_owned());
1978 };
1979 Ok(WorkerProcessId::new(worker_id.into(), process_id))
1980 }
1981}
1982
1983pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1984 let guard = sessions_map.read();
1985 if let Some(session) = guard.get(&session_id) {
1986 session.cancel_current_query();
1987 true
1988 } else {
1989 info!("Current session finished, ignoring cancel query request");
1990 false
1991 }
1992}
1993
1994pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1995 let guard = sessions_map.read();
1996 if let Some(session) = guard.get(&session_id) {
1997 session.cancel_current_creating_job();
1998 true
1999 } else {
2000 info!("Current session finished, ignoring cancel creating request");
2001 false
2002 }
2003}