Skip to main content

risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36    UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45    WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61    LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85    MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108    check_schema_writable,
109};
110use crate::error::{
111    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130    HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143/// The global environment for the frontend server.
144#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146    // Different session may access catalog at the same time and catalog is protected by a
147    // RwLock.
148    meta_client: Arc<dyn FrontendMetaClient>,
149    catalog_writer: Arc<dyn CatalogWriter>,
150    catalog_reader: CatalogReader,
151    user_info_writer: Arc<dyn UserInfoWriter>,
152    user_info_reader: UserInfoReader,
153    worker_node_manager: WorkerNodeManagerRef,
154    query_manager: QueryManager,
155    hummock_snapshot_manager: HummockSnapshotManagerRef,
156    system_params_manager: LocalSystemParamsManagerRef,
157    session_params: Arc<RwLock<SessionConfig>>,
158
159    server_addr: HostAddr,
160    client_pool: ComputeClientPoolRef,
161    frontend_client_pool: FrontendClientPoolRef,
162    monitor_client_pool: MonitorClientPoolRef,
163
164    /// Each session is identified by (`process_id`,
165    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
166    /// running queries.
167    sessions_map: SessionMapRef,
168
169    pub frontend_metrics: Arc<FrontendMetrics>,
170
171    pub cursor_metrics: Arc<CursorMetrics>,
172
173    source_metrics: Arc<SourceMetrics>,
174
175    /// Batch spill metrics
176    spill_metrics: Arc<BatchSpillMetrics>,
177
178    batch_config: BatchConfig,
179    frontend_config: FrontendConfig,
180    #[expect(dead_code)]
181    meta_config: MetaConfig,
182    streaming_config: StreamingConfig,
183    udf_config: UdfConfig,
184
185    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
186    /// received.
187    creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
190    /// root stage in mpp mode.
191    compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193    /// Memory context used for batch executors in frontend.
194    mem_context: MemoryContext,
195
196    #[cfg(feature = "datafusion")]
197    /// Shared budget context for **DataFusion** spillable operators.
198    /// Created by [`crate::datafusion::execute::memory_ctx::create_df_spillable_budget_ctx`];
199    /// caps total spillable usage so unspillable operators always have headroom.
200    /// Not used by the RisingWave batch engine.
201    df_spillable_budget_ctx: MemoryContext,
202
203    /// Prometheus client for querying metrics.
204    prometheus_client: Option<PrometheusClient>,
205
206    /// The additional selector used when querying Prometheus.
207    prometheus_selector: String,
208}
209
210/// Session map identified by `(process_id, secret_key)`
211pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
212
213/// The proportion of frontend memory used for batch processing.
214const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
215
216impl FrontendEnv {
217    pub fn mock() -> Self {
218        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
219
220        let catalog = Arc::new(RwLock::new(Catalog::default()));
221        let meta_client = Arc::new(MockFrontendMetaClient {});
222        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
223        let catalog_writer = Arc::new(MockCatalogWriter::new(
224            catalog.clone(),
225            hummock_snapshot_manager.clone(),
226        ));
227        let catalog_reader = CatalogReader::new(catalog);
228        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
229        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
230        let user_info_reader = UserInfoReader::new(user_info_manager);
231        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
232        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
233        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
234        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
235        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
236        let query_manager = QueryManager::new(
237            worker_node_manager.clone(),
238            compute_client_pool,
239            catalog_reader.clone(),
240            Arc::new(DistributedQueryMetrics::for_test()),
241            None,
242            None,
243        );
244        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
245        let client_pool = Arc::new(ComputeClientPool::for_test());
246        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
247        let runtime = {
248            let mut builder = Builder::new_multi_thread();
249            if let Some(frontend_compute_runtime_worker_threads) =
250                load_config("", FrontendOpts::default())
251                    .batch
252                    .frontend_compute_runtime_worker_threads
253            {
254                builder.worker_threads(frontend_compute_runtime_worker_threads);
255            }
256            builder
257                .thread_name("rw-batch-local")
258                .enable_all()
259                .build()
260                .unwrap()
261        };
262        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
263        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
264        Self {
265            meta_client,
266            catalog_writer,
267            catalog_reader,
268            user_info_writer,
269            user_info_reader,
270            worker_node_manager,
271            query_manager,
272            hummock_snapshot_manager,
273            system_params_manager,
274            session_params: Default::default(),
275            server_addr,
276            client_pool,
277            frontend_client_pool,
278            monitor_client_pool,
279            sessions_map,
280            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
281            cursor_metrics: Arc::new(CursorMetrics::for_test()),
282            batch_config: BatchConfig::default(),
283            frontend_config: FrontendConfig::default(),
284            meta_config: MetaConfig::default(),
285            streaming_config: StreamingConfig::default(),
286            udf_config: UdfConfig::default(),
287            source_metrics: Arc::new(SourceMetrics::default()),
288            spill_metrics: BatchSpillMetrics::for_test(),
289            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
290            compute_runtime,
291            mem_context: MemoryContext::none(),
292            #[cfg(feature = "datafusion")]
293            df_spillable_budget_ctx: MemoryContext::none(),
294            prometheus_client: None,
295            prometheus_selector: String::new(),
296        }
297    }
298
299    pub(crate) fn set_frontend_config_for_test(&mut self, frontend_config: FrontendConfig) {
300        self.frontend_config = frontend_config;
301    }
302
303    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
304        let config = load_config(&opts.config_path, &opts);
305        info!("Starting frontend node");
306        info!("> config: {:?}", config);
307        info!(
308            "> debug assertions: {}",
309            if cfg!(debug_assertions) { "on" } else { "off" }
310        );
311        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
312
313        let frontend_address: HostAddr = opts
314            .advertise_addr
315            .as_ref()
316            .unwrap_or_else(|| {
317                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
318                &opts.listen_addr
319            })
320            .parse()
321            .unwrap();
322        info!("advertise addr is {}", frontend_address);
323
324        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
325        let internal_rpc_host_addr = HostAddr {
326            // Use the host of advertise address for the frontend rpc address.
327            host: frontend_address.host.clone(),
328            port: rpc_addr.port,
329        };
330        // Register in meta by calling `AddWorkerNode` RPC.
331        let (meta_client, system_params_reader) = MetaClient::register_new(
332            opts.meta_addr,
333            WorkerType::Frontend,
334            &frontend_address,
335            AddWorkerNodeProperty {
336                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
337                ..Default::default()
338            },
339            Arc::new(config.meta.clone()),
340        )
341        .await;
342
343        let worker_id = meta_client.worker_id();
344        info!("Assigned worker node id {}", worker_id);
345
346        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
347            meta_client.clone(),
348            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
349        );
350        let mut join_handles = vec![heartbeat_join_handle];
351        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
352
353        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
354        let hummock_snapshot_manager =
355            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
356
357        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
358        let catalog = Arc::new(RwLock::new(Catalog::default()));
359        let catalog_writer = Arc::new(CatalogWriterImpl::new(
360            meta_client.clone(),
361            catalog_updated_rx.clone(),
362            hummock_snapshot_manager.clone(),
363        ));
364        let catalog_reader = CatalogReader::new(catalog.clone());
365
366        let worker_node_manager = Arc::new(WorkerNodeManager::new());
367
368        let compute_client_pool = Arc::new(ComputeClientPool::new(
369            config.batch_exchange_connection_pool_size(),
370            config.batch.developer.compute_client_config.clone(),
371        ));
372        let frontend_client_pool = Arc::new(FrontendClientPool::new(
373            1,
374            config.batch.developer.frontend_client_config.clone(),
375        ));
376        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
377        let query_manager = QueryManager::new(
378            worker_node_manager.clone(),
379            compute_client_pool.clone(),
380            catalog_reader.clone(),
381            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
382            config.batch.distributed_query_limit,
383            config.batch.max_batch_queries_per_frontend_node,
384        );
385
386        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
387        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
388        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
389            meta_client.clone(),
390            catalog_updated_rx,
391        ));
392
393        let system_params_manager =
394            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
395
396        LocalSecretManager::init(
397            opts.temp_secret_file_dir,
398            meta_client.cluster_id().to_owned(),
399            worker_id,
400        );
401
402        // This `session_params` should be initialized during the initial notification in `observer_manager`
403        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
404        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
405        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
406
407        let frontend_observer_node = FrontendObserverNode::new(
408            worker_node_manager.clone(),
409            catalog,
410            catalog_updated_tx,
411            user_info_manager,
412            hummock_snapshot_manager.clone(),
413            system_params_manager.clone(),
414            session_params.clone(),
415            compute_client_pool.clone(),
416        );
417        let observer_manager =
418            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
419                .await;
420        let observer_join_handle = observer_manager.start().await;
421        join_handles.push(observer_join_handle);
422
423        meta_client.activate(&frontend_address).await?;
424
425        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
426        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
427        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
428
429        if config.server.metrics_level > MetricLevel::Disabled {
430            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
431        }
432
433        let health_srv = HealthServiceImpl::new();
434        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
435        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
436        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
437
438        let telemetry_manager = TelemetryManager::new(
439            Arc::new(meta_client.clone()),
440            Arc::new(FrontendTelemetryCreator::new()),
441        );
442
443        // if the toml config file or env variable disables telemetry, do not watch system params
444        // change because if any of configs disable telemetry, we should never start it
445        if config.server.telemetry_enabled && telemetry_env_enabled() {
446            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
447            join_handles.push(join_handle);
448            shutdown_senders.push(shutdown_sender);
449        } else {
450            tracing::info!("Telemetry didn't start due to config");
451        }
452
453        tokio::spawn(async move {
454            tonic::transport::Server::builder()
455                .add_service(HealthServer::new(health_srv))
456                .add_service(FrontendServiceServer::new(frontend_srv))
457                .add_service(configured_monitor_service_server(
458                    MonitorServiceServer::new(monitor_srv),
459                ))
460                .serve(frontend_rpc_addr)
461                .await
462                .unwrap();
463        });
464        info!(
465            "Health Check RPC Listener is set up on {}",
466            opts.frontend_rpc_listener_addr.clone()
467        );
468
469        let creating_streaming_job_tracker =
470            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
471
472        let runtime = {
473            let mut builder = Builder::new_multi_thread();
474            if let Some(frontend_compute_runtime_worker_threads) =
475                config.batch.frontend_compute_runtime_worker_threads
476            {
477                builder.worker_threads(frontend_compute_runtime_worker_threads);
478            }
479            builder
480                .thread_name("rw-batch-local")
481                .enable_all()
482                .build()
483                .unwrap()
484        };
485        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
486
487        let sessions = sessions_map.clone();
488        // Idle transaction background monitor
489        let join_handle = tokio::spawn(async move {
490            let mut check_idle_txn_interval =
491                tokio::time::interval(core::time::Duration::from_secs(10));
492            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
493            check_idle_txn_interval.reset();
494            loop {
495                check_idle_txn_interval.tick().await;
496                sessions.read().values().for_each(|session| {
497                    let _ = session.check_idle_in_transaction_timeout();
498                })
499            }
500        });
501        join_handles.push(join_handle);
502
503        // Clean up the spill directory.
504        #[cfg(not(madsim))]
505        if config.batch.enable_spill {
506            SpillOp::clean_spill_directory()
507                .await
508                .map_err(|err| anyhow!(err))?;
509        }
510
511        let total_memory_bytes = opts.frontend_total_memory_bytes;
512        let heap_profiler =
513            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
514        // Run a background heap profiler
515        heap_profiler.start();
516
517        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
518        let mem_context = MemoryContext::root(
519            frontend_metrics.batch_total_mem.clone(),
520            batch_memory_limit as u64,
521        );
522        #[cfg(feature = "datafusion")]
523        let df_spillable_budget_ctx =
524            crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
525
526        // Initialize Prometheus client if endpoint is provided
527        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
528            match PrometheusClient::try_from(endpoint.as_str()) {
529                Ok(client) => {
530                    info!("Prometheus client initialized with endpoint: {}", endpoint);
531                    Some(client)
532                }
533                Err(e) => {
534                    error!(
535                        error = %e.as_report(),
536                        "failed to initialize Prometheus client",
537                    );
538                    None
539                }
540            }
541        } else {
542            None
543        };
544
545        // Initialize Prometheus selector
546        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
547
548        info!(
549            "Frontend  total_memory: {} batch_memory: {}",
550            convert(total_memory_bytes as _),
551            convert(batch_memory_limit as _),
552        );
553
554        Ok((
555            Self {
556                catalog_reader,
557                catalog_writer,
558                user_info_reader,
559                user_info_writer,
560                worker_node_manager,
561                meta_client: frontend_meta_client,
562                query_manager,
563                hummock_snapshot_manager,
564                system_params_manager,
565                session_params,
566                server_addr: frontend_address,
567                client_pool: compute_client_pool,
568                frontend_client_pool,
569                monitor_client_pool,
570                frontend_metrics,
571                cursor_metrics,
572                spill_metrics,
573                sessions_map,
574                batch_config: config.batch,
575                frontend_config: config.frontend,
576                meta_config: config.meta,
577                streaming_config: config.streaming,
578                udf_config: config.udf,
579                source_metrics,
580                creating_streaming_job_tracker,
581                compute_runtime,
582                mem_context,
583                #[cfg(feature = "datafusion")]
584                df_spillable_budget_ctx,
585                prometheus_client,
586                prometheus_selector,
587            },
588            join_handles,
589            shutdown_senders,
590        ))
591    }
592
593    /// Get a reference to the frontend env's catalog writer.
594    ///
595    /// This method is intentionally private, and a write guard is required for the caller to
596    /// prove that the write operations are permitted in the current transaction.
597    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
598        &*self.catalog_writer
599    }
600
601    /// Get a reference to the frontend env's catalog reader.
602    pub fn catalog_reader(&self) -> &CatalogReader {
603        &self.catalog_reader
604    }
605
606    /// Get a reference to the frontend env's user info writer.
607    ///
608    /// This method is intentionally private, and a write guard is required for the caller to
609    /// prove that the write operations are permitted in the current transaction.
610    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
611        &*self.user_info_writer
612    }
613
614    /// Get a reference to the frontend env's user info reader.
615    pub fn user_info_reader(&self) -> &UserInfoReader {
616        &self.user_info_reader
617    }
618
619    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
620        self.worker_node_manager.clone()
621    }
622
623    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
624        &*self.meta_client
625    }
626
627    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
628        self.meta_client.clone()
629    }
630
631    pub fn query_manager(&self) -> &QueryManager {
632        &self.query_manager
633    }
634
635    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
636        &self.hummock_snapshot_manager
637    }
638
639    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
640        &self.system_params_manager
641    }
642
643    pub fn session_params_snapshot(&self) -> SessionConfig {
644        self.session_params.read_recursive().clone()
645    }
646
647    /// Get a reference to the Prometheus client if available.
648    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
649        self.prometheus_client.as_ref()
650    }
651
652    /// Get the Prometheus selector string.
653    pub fn prometheus_selector(&self) -> &str {
654        &self.prometheus_selector
655    }
656
657    pub fn server_address(&self) -> &HostAddr {
658        &self.server_addr
659    }
660
661    pub fn client_pool(&self) -> ComputeClientPoolRef {
662        self.client_pool.clone()
663    }
664
665    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
666        self.frontend_client_pool.clone()
667    }
668
669    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
670        self.monitor_client_pool.clone()
671    }
672
673    pub fn batch_config(&self) -> &BatchConfig {
674        &self.batch_config
675    }
676
677    pub fn frontend_config(&self) -> &FrontendConfig {
678        &self.frontend_config
679    }
680
681    pub fn streaming_config(&self) -> &StreamingConfig {
682        &self.streaming_config
683    }
684
685    pub fn udf_config(&self) -> &UdfConfig {
686        &self.udf_config
687    }
688
689    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
690        self.source_metrics.clone()
691    }
692
693    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
694        self.spill_metrics.clone()
695    }
696
697    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
698        &self.creating_streaming_job_tracker
699    }
700
701    pub fn sessions_map(&self) -> &SessionMapRef {
702        &self.sessions_map
703    }
704
705    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
706        self.compute_runtime.clone()
707    }
708
709    /// Cancel queries (i.e. batch queries) in session.
710    /// If the session exists return true, otherwise, return false.
711    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
712        cancel_queries_in_session(session_id, self.sessions_map.clone())
713    }
714
715    /// Cancel creating jobs (i.e. streaming queries) in session.
716    /// If the session exists return true, otherwise, return false.
717    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
718        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
719    }
720
721    pub fn mem_context(&self) -> MemoryContext {
722        self.mem_context.clone()
723    }
724
725    #[cfg(feature = "datafusion")]
726    pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
727        self.df_spillable_budget_ctx.clone()
728    }
729}
730
731#[derive(Clone)]
732pub struct AuthContext {
733    pub database: String,
734    pub user_name: String,
735    pub user_id: UserId,
736}
737
738impl AuthContext {
739    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
740        Self {
741            database,
742            user_name,
743            user_id,
744        }
745    }
746}
747pub struct SessionImpl {
748    env: FrontendEnv,
749    auth_context: Arc<RwLock<AuthContext>>,
750    /// Used for user authentication.
751    user_authenticator: UserAuthenticator,
752    /// Stores the value of configurations.
753    config_map: Arc<RwLock<SessionConfig>>,
754
755    /// Channel sender for frontend handler to send notices.
756    notice_tx: UnboundedSender<String>,
757    /// Channel receiver for pgwire to take notices and send to clients.
758    notice_rx: Mutex<UnboundedReceiver<String>>,
759
760    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
761    id: (i32, i32),
762
763    /// Client address
764    peer_addr: AddressRef,
765
766    /// Transaction state.
767    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
768    /// async functions, there should actually be no contention.
769    txn: Arc<Mutex<transaction::State>>,
770
771    /// Query cancel flag.
772    /// This flag is set only when current query is executed in local mode, and used to cancel
773    /// local query.
774    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
775
776    /// execution context represents the lifetime of a running SQL in the current session
777    exec_context: Mutex<Option<Weak<ExecContext>>>,
778
779    /// Last idle instant
780    last_idle_instant: Arc<Mutex<Option<Instant>>>,
781
782    cursor_manager: Arc<CursorManager>,
783
784    /// temporary sources for the current session
785    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
786
787    /// staging catalogs for the current session
788    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
789}
790
791/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
792/// Temporary sources are automatically dropped at the end of a session
793/// Temporary sources are expected to be selected by batch queries, not streaming queries.
794/// Temporary sources currently are only used by cloud portal to preview the data during table and
795/// source creation, so it is a internal feature and not exposed to users.
796/// The current PR supports temporary source with minimum effort,
797/// so we don't care about the database name and schema name, but only care about the source name.
798/// Temporary sources can only be shown via `show sources` command but not other system tables.
799#[derive(Default, Clone)]
800pub struct TemporarySourceManager {
801    sources: HashMap<String, SourceCatalog>,
802}
803
804impl TemporarySourceManager {
805    pub fn new() -> Self {
806        Self {
807            sources: HashMap::new(),
808        }
809    }
810
811    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
812        self.sources.insert(name, source);
813    }
814
815    pub fn drop_source(&mut self, name: &str) {
816        self.sources.remove(name);
817    }
818
819    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
820        self.sources.get(name)
821    }
822
823    pub fn keys(&self) -> Vec<String> {
824        self.sources.keys().cloned().collect()
825    }
826}
827
828/// Staging catalog manager is used to manage the tables creating in the current session.
829#[derive(Default, Clone)]
830pub struct StagingCatalogManager {
831    // staging tables creating in the current session.
832    tables: HashMap<String, TableCatalog>,
833}
834
835impl StagingCatalogManager {
836    pub fn new() -> Self {
837        Self {
838            tables: HashMap::new(),
839        }
840    }
841
842    pub fn create_table(&mut self, name: String, table: TableCatalog) {
843        self.tables.insert(name, table);
844    }
845
846    pub fn drop_table(&mut self, name: &str) {
847        self.tables.remove(name);
848    }
849
850    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
851        self.tables.get(name)
852    }
853}
854
855#[derive(Error, Debug)]
856pub enum CheckRelationError {
857    #[error("{0}")]
858    Resolve(#[from] ResolveQualifiedNameError),
859    #[error("{0}")]
860    Catalog(#[from] CatalogError),
861}
862
863impl From<CheckRelationError> for RwError {
864    fn from(e: CheckRelationError) -> Self {
865        match e {
866            CheckRelationError::Resolve(e) => e.into(),
867            CheckRelationError::Catalog(e) => e.into(),
868        }
869    }
870}
871
872impl SessionImpl {
873    pub(crate) fn new(
874        env: FrontendEnv,
875        auth_context: AuthContext,
876        user_authenticator: UserAuthenticator,
877        id: SessionId,
878        peer_addr: AddressRef,
879        session_config: SessionConfig,
880    ) -> Self {
881        let cursor_metrics = env.cursor_metrics.clone();
882        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
883
884        Self {
885            env,
886            auth_context: Arc::new(RwLock::new(auth_context)),
887            user_authenticator,
888            config_map: Arc::new(RwLock::new(session_config)),
889            id,
890            peer_addr,
891            txn: Default::default(),
892            current_query_cancel_flag: Mutex::new(None),
893            notice_tx,
894            notice_rx: Mutex::new(notice_rx),
895            exec_context: Mutex::new(None),
896            last_idle_instant: Default::default(),
897            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
898            temporary_source_manager: Default::default(),
899            staging_catalog_manager: Default::default(),
900        }
901    }
902
903    #[cfg(test)]
904    pub fn mock() -> Self {
905        let env = FrontendEnv::mock();
906        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
907
908        Self {
909            env: FrontendEnv::mock(),
910            auth_context: Arc::new(RwLock::new(AuthContext::new(
911                DEFAULT_DATABASE_NAME.to_owned(),
912                DEFAULT_SUPER_USER.to_owned(),
913                DEFAULT_SUPER_USER_ID,
914            ))),
915            user_authenticator: UserAuthenticator::None,
916            config_map: Default::default(),
917            // Mock session use non-sense id.
918            id: (0, 0),
919            txn: Default::default(),
920            current_query_cancel_flag: Mutex::new(None),
921            notice_tx,
922            notice_rx: Mutex::new(notice_rx),
923            exec_context: Mutex::new(None),
924            peer_addr: Address::Tcp(SocketAddr::new(
925                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
926                8080,
927            ))
928            .into(),
929            last_idle_instant: Default::default(),
930            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
931            temporary_source_manager: Default::default(),
932            staging_catalog_manager: Default::default(),
933        }
934    }
935
936    pub(crate) fn env(&self) -> &FrontendEnv {
937        &self.env
938    }
939
940    pub fn auth_context(&self) -> Arc<AuthContext> {
941        let ctx = self.auth_context.read();
942        Arc::new(ctx.clone())
943    }
944
945    pub fn database(&self) -> String {
946        self.auth_context.read().database.clone()
947    }
948
949    pub fn database_id(&self) -> DatabaseId {
950        let db_name = self.database();
951        self.env
952            .catalog_reader()
953            .read_guard()
954            .get_database_by_name(&db_name)
955            .map(|db| db.id())
956            .expect("session database not found")
957    }
958
959    pub fn user_name(&self) -> String {
960        self.auth_context.read().user_name.clone()
961    }
962
963    pub fn user_id(&self) -> UserId {
964        self.auth_context.read().user_id
965    }
966
967    pub fn update_database(&self, database: String) {
968        self.auth_context.write().database = database;
969    }
970
971    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
972        Arc::clone(&self.config_map)
973    }
974
975    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
976        self.config_map.read()
977    }
978
979    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
980        self.config_map
981            .write()
982            .set(key, value, &mut ())
983            .map_err(Into::into)
984    }
985
986    pub fn reset_config(&self, key: &str) -> Result<String> {
987        self.config_map
988            .write()
989            .reset(key, &mut ())
990            .map_err(Into::into)
991    }
992
993    pub fn set_config_report(
994        &self,
995        key: &str,
996        value: Option<String>,
997        mut reporter: impl ConfigReporter,
998    ) -> Result<String> {
999        if let Some(value) = value {
1000            self.config_map
1001                .write()
1002                .set(key, value, &mut reporter)
1003                .map_err(Into::into)
1004        } else {
1005            self.config_map
1006                .write()
1007                .reset(key, &mut reporter)
1008                .map_err(Into::into)
1009        }
1010    }
1011
1012    pub fn session_id(&self) -> SessionId {
1013        self.id
1014    }
1015
1016    pub fn running_sql(&self) -> Option<Arc<str>> {
1017        self.exec_context
1018            .lock()
1019            .as_ref()
1020            .and_then(|weak| weak.upgrade())
1021            .map(|context| context.running_sql.clone())
1022    }
1023
1024    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1025        self.cursor_manager.clone()
1026    }
1027
1028    pub fn peer_addr(&self) -> &Address {
1029        &self.peer_addr
1030    }
1031
1032    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1033        self.exec_context
1034            .lock()
1035            .as_ref()
1036            .and_then(|weak| weak.upgrade())
1037            .map(|context| context.last_instant.elapsed().as_millis())
1038    }
1039
1040    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1041        self.last_idle_instant
1042            .lock()
1043            .as_ref()
1044            .map(|x| x.elapsed().as_millis())
1045    }
1046
1047    pub fn check_relation_name_duplicated(
1048        &self,
1049        name: ObjectName,
1050        stmt_type: StatementType,
1051        if_not_exists: bool,
1052    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1053        let db_name = &self.database();
1054        let catalog_reader = self.env().catalog_reader().read_guard();
1055        let (schema_name, relation_name) = {
1056            let (schema_name, relation_name) =
1057                Binder::resolve_schema_qualified_name(db_name, &name)?;
1058            let search_path = self.config().search_path();
1059            let user_name = &self.user_name();
1060            let schema_name = match schema_name {
1061                Some(schema_name) => schema_name,
1062                None => catalog_reader
1063                    .first_valid_schema(db_name, &search_path, user_name)?
1064                    .name(),
1065            };
1066            (schema_name, relation_name)
1067        };
1068        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1069            Err(e) if if_not_exists => {
1070                if let CatalogErrorInner::Duplicated {
1071                    name,
1072                    under_creation,
1073                    ..
1074                } = e.inner()
1075                {
1076                    // If relation is created, return directly.
1077                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1078                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1079                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1080                    if !*under_creation {
1081                        Ok(Either::Right(
1082                            PgResponse::builder(stmt_type)
1083                                .notice(format!("relation \"{}\" already exists, skipping", name))
1084                                .into(),
1085                        ))
1086                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1087                        // For now, when a Subscription is creating, we return directly with an additional message.
1088                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1089                        Ok(Either::Right(
1090                            PgResponse::builder(stmt_type)
1091                                .notice(format!(
1092                                    "relation \"{}\" already exists but still creating, skipping",
1093                                    name
1094                                ))
1095                                .into(),
1096                        ))
1097                    } else {
1098                        Ok(Either::Left(()))
1099                    }
1100                } else {
1101                    Err(e.into())
1102                }
1103            }
1104            Err(e) => Err(e.into()),
1105            Ok(_) => Ok(Either::Left(())),
1106        }
1107    }
1108
1109    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1110        let db_name = &self.database();
1111        let catalog_reader = self.env().catalog_reader().read_guard();
1112        let (schema_name, secret_name) = {
1113            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1114            let search_path = self.config().search_path();
1115            let user_name = &self.user_name();
1116            let schema_name = match schema_name {
1117                Some(schema_name) => schema_name,
1118                None => catalog_reader
1119                    .first_valid_schema(db_name, &search_path, user_name)?
1120                    .name(),
1121            };
1122            (schema_name, secret_name)
1123        };
1124        catalog_reader
1125            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1126            .map_err(RwError::from)
1127    }
1128
1129    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1130        let db_name = &self.database();
1131        let catalog_reader = self.env().catalog_reader().read_guard();
1132        let (schema_name, connection_name) = {
1133            let (schema_name, connection_name) =
1134                Binder::resolve_schema_qualified_name(db_name, &name)?;
1135            let search_path = self.config().search_path();
1136            let user_name = &self.user_name();
1137            let schema_name = match schema_name {
1138                Some(schema_name) => schema_name,
1139                None => catalog_reader
1140                    .first_valid_schema(db_name, &search_path, user_name)?
1141                    .name(),
1142            };
1143            (schema_name, connection_name)
1144        };
1145        catalog_reader
1146            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1147            .map_err(RwError::from)
1148    }
1149
1150    pub fn check_function_name_duplicated(
1151        &self,
1152        stmt_type: StatementType,
1153        name: ObjectName,
1154        arg_types: &[DataType],
1155        if_not_exists: bool,
1156    ) -> Result<Either<(), RwPgResponse>> {
1157        let db_name = &self.database();
1158        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1159        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1160
1161        let catalog_reader = self.env().catalog_reader().read_guard();
1162        if catalog_reader
1163            .get_schema_by_id(database_id, schema_id)?
1164            .get_function_by_name_args(&function_name, arg_types)
1165            .is_some()
1166        {
1167            let full_name = format!(
1168                "{function_name}({})",
1169                arg_types.iter().map(|t| t.to_string()).join(",")
1170            );
1171            if if_not_exists {
1172                Ok(Either::Right(
1173                    PgResponse::builder(stmt_type)
1174                        .notice(format!(
1175                            "function \"{}\" already exists, skipping",
1176                            full_name
1177                        ))
1178                        .into(),
1179                ))
1180            } else {
1181                Err(CatalogError::duplicated("function", full_name).into())
1182            }
1183        } else {
1184            Ok(Either::Left(()))
1185        }
1186    }
1187
1188    /// Also check if the user has the privilege to create in the schema.
1189    pub fn get_database_and_schema_id_for_create(
1190        &self,
1191        schema_name: Option<String>,
1192    ) -> Result<(DatabaseId, SchemaId)> {
1193        let db_name = &self.database();
1194
1195        let search_path = self.config().search_path();
1196        let user_name = &self.user_name();
1197
1198        let catalog_reader = self.env().catalog_reader().read_guard();
1199        let schema = match schema_name {
1200            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1201            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1202        };
1203        let schema_name = schema.name();
1204
1205        check_schema_writable(&schema_name)?;
1206        self.check_privileges(&[ObjectCheckItem::new(
1207            schema.owner(),
1208            AclMode::Create,
1209            schema_name,
1210            schema.id(),
1211        )])?;
1212
1213        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1214        Ok((db_id, schema.id()))
1215    }
1216
1217    pub fn get_connection_by_name(
1218        &self,
1219        schema_name: Option<String>,
1220        connection_name: &str,
1221    ) -> Result<Arc<ConnectionCatalog>> {
1222        let db_name = &self.database();
1223        let search_path = self.config().search_path();
1224        let user_name = &self.user_name();
1225
1226        let catalog_reader = self.env().catalog_reader().read_guard();
1227        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1228        let (connection, _) =
1229            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1230
1231        self.check_privileges(&[ObjectCheckItem::new(
1232            connection.owner(),
1233            AclMode::Usage,
1234            connection.name.clone(),
1235            connection.id,
1236        )])?;
1237
1238        Ok(connection.clone())
1239    }
1240
1241    pub fn get_subscription_by_schema_id_name(
1242        &self,
1243        schema_id: SchemaId,
1244        subscription_name: &str,
1245    ) -> Result<Arc<SubscriptionCatalog>> {
1246        let db_name = &self.database();
1247
1248        let catalog_reader = self.env().catalog_reader().read_guard();
1249        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1250        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1251        let subscription = schema
1252            .get_subscription_by_name(subscription_name)
1253            .ok_or_else(|| {
1254                RwError::from(ErrorCode::ItemNotFound(format!(
1255                    "subscription {} not found",
1256                    subscription_name
1257                )))
1258            })?;
1259        Ok(subscription.clone())
1260    }
1261
1262    pub fn get_subscription_by_name(
1263        &self,
1264        schema_name: Option<String>,
1265        subscription_name: &str,
1266    ) -> Result<Arc<SubscriptionCatalog>> {
1267        let db_name = &self.database();
1268        let search_path = self.config().search_path();
1269        let user_name = &self.user_name();
1270
1271        let catalog_reader = self.env().catalog_reader().read_guard();
1272        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1273        let (subscription, _) =
1274            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1275        Ok(subscription.clone())
1276    }
1277
1278    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1279        let catalog_reader = self.env().catalog_reader().read_guard();
1280        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1281    }
1282
1283    pub fn get_table_by_name(
1284        &self,
1285        table_name: &str,
1286        db_id: DatabaseId,
1287        schema_id: SchemaId,
1288    ) -> Result<Arc<TableCatalog>> {
1289        let catalog_reader = self.env().catalog_reader().read_guard();
1290        let table = catalog_reader
1291            .get_schema_by_id(db_id, schema_id)?
1292            .get_created_table_by_name(table_name)
1293            .ok_or_else(|| {
1294                Error::new(
1295                    ErrorKind::InvalidInput,
1296                    format!("table \"{}\" does not exist", table_name),
1297                )
1298            })?;
1299
1300        self.check_privileges(&[ObjectCheckItem::new(
1301            table.owner(),
1302            AclMode::Select,
1303            table_name.to_owned(),
1304            table.id,
1305        )])?;
1306
1307        Ok(table.clone())
1308    }
1309
1310    pub fn get_secret_by_name(
1311        &self,
1312        schema_name: Option<String>,
1313        secret_name: &str,
1314    ) -> Result<Arc<SecretCatalog>> {
1315        let db_name = &self.database();
1316        let search_path = self.config().search_path();
1317        let user_name = &self.user_name();
1318
1319        let catalog_reader = self.env().catalog_reader().read_guard();
1320        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1321        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1322
1323        self.check_privileges(&[ObjectCheckItem::new(
1324            secret.owner(),
1325            AclMode::Usage,
1326            secret.name.clone(),
1327            secret.id,
1328        )])?;
1329
1330        Ok(secret.clone())
1331    }
1332
1333    pub async fn list_change_log_epochs(
1334        &self,
1335        table_id: TableId,
1336        min_epoch: u64,
1337        max_count: u32,
1338    ) -> Result<Vec<u64>> {
1339        let Some(max_epoch) = self
1340            .env
1341            .hummock_snapshot_manager()
1342            .acquire()
1343            .version()
1344            .state_table_info
1345            .info()
1346            .get(&table_id)
1347            .map(|s| s.committed_epoch)
1348        else {
1349            return Ok(vec![]);
1350        };
1351        let ret = self
1352            .env
1353            .meta_client_ref()
1354            .get_hummock_table_change_log(
1355                Some(min_epoch),
1356                Some(max_epoch),
1357                Some(iter::once(table_id).collect()),
1358                true,
1359                Some(max_count),
1360            )
1361            .await?;
1362        let Some(e) = ret.get(&table_id) else {
1363            return Ok(vec![]);
1364        };
1365        Ok(e.iter()
1366            .flat_map(|l| l.epochs())
1367            .filter(|e| *e >= min_epoch && *e <= max_epoch)
1368            .take(max_count as usize)
1369            .collect())
1370    }
1371
1372    pub fn clear_cancel_query_flag(&self) {
1373        let mut flag = self.current_query_cancel_flag.lock();
1374        *flag = None;
1375    }
1376
1377    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1378        let mut flag = self.current_query_cancel_flag.lock();
1379        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1380        *flag = Some(shutdown_tx);
1381        shutdown_rx
1382    }
1383
1384    pub fn set_cancel_query_flag(&self, shutdown_tx: ShutdownSender) {
1385        let mut flag = self.current_query_cancel_flag.lock();
1386        *flag = Some(shutdown_tx);
1387    }
1388
1389    pub fn cancel_current_query(&self) {
1390        let mut flag_guard = self.current_query_cancel_flag.lock();
1391        if let Some(sender) = flag_guard.take() {
1392            info!("Trying to cancel query in local mode.");
1393            // Current running query is in local mode
1394            sender.cancel();
1395            info!("Cancel query request sent.");
1396        }
1397        info!("Trying to cancel query in distributed mode.");
1398        self.env.query_manager().cancel_queries_in_session(self.id)
1399    }
1400
1401    pub fn cancel_current_creating_job(&self) {
1402        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1403    }
1404
1405    /// This function only used for test now.
1406    /// Maybe we can remove it in the future.
1407    pub async fn run_statement(
1408        self: Arc<Self>,
1409        sql: Arc<str>,
1410        formats: Vec<Format>,
1411    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1412        // Parse sql.
1413        let mut stmts = Parser::parse_sql(&sql)?;
1414        if stmts.is_empty() {
1415            return Ok(PgResponse::empty_result(
1416                pgwire::pg_response::StatementType::EMPTY,
1417            ));
1418        }
1419        if stmts.len() > 1 {
1420            return Ok(
1421                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1422                    .notice("cannot insert multiple commands into statement")
1423                    .into(),
1424            );
1425        }
1426        let stmt = stmts.swap_remove(0);
1427        let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1428        Ok(rsp)
1429    }
1430
1431    pub fn notice_to_user(&self, str: impl Into<String>) {
1432        let notice = str.into();
1433        tracing::trace!(notice, "notice to user");
1434        self.notice_tx
1435            .send(notice)
1436            .expect("notice channel should not be closed");
1437    }
1438
1439    pub fn is_barrier_read(&self) -> bool {
1440        match self.config().visibility_mode() {
1441            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1442            VisibilityMode::All => true,
1443            VisibilityMode::Checkpoint => false,
1444        }
1445    }
1446
1447    pub fn statement_timeout(&self) -> Duration {
1448        if self.config().statement_timeout().millis() == 0 {
1449            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1450        } else {
1451            Duration::from_millis(self.config().statement_timeout().millis() as u64)
1452        }
1453    }
1454
1455    pub fn create_temporary_source(&self, source: SourceCatalog) {
1456        self.temporary_source_manager
1457            .lock()
1458            .create_source(source.name.clone(), source);
1459    }
1460
1461    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1462        self.temporary_source_manager
1463            .lock()
1464            .get_source(name)
1465            .cloned()
1466    }
1467
1468    pub fn drop_temporary_source(&self, name: &str) {
1469        self.temporary_source_manager.lock().drop_source(name);
1470    }
1471
1472    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1473        self.temporary_source_manager.lock().clone()
1474    }
1475
1476    pub fn create_staging_table(&self, table: TableCatalog) {
1477        self.staging_catalog_manager
1478            .lock()
1479            .create_table(table.name.clone(), table);
1480    }
1481
1482    pub fn drop_staging_table(&self, name: &str) {
1483        self.staging_catalog_manager.lock().drop_table(name);
1484    }
1485
1486    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1487        self.staging_catalog_manager.lock().clone()
1488    }
1489
1490    pub async fn check_cluster_limits(&self) -> Result<()> {
1491        if self.config().bypass_cluster_limits() {
1492            return Ok(());
1493        }
1494
1495        let gen_message = |ActorCountPerParallelism {
1496                               worker_id_to_actor_count,
1497                               hard_limit,
1498                               soft_limit,
1499                           }: ActorCountPerParallelism,
1500                           exceed_hard_limit: bool|
1501         -> String {
1502            let (limit_type, action) = if exceed_hard_limit {
1503                ("critical", "Scale the cluster immediately to proceed.")
1504            } else {
1505                (
1506                    "recommended",
1507                    "Consider scaling the cluster for optimal performance.",
1508                )
1509            };
1510            format!(
1511                r#"Actor count per parallelism exceeds the {limit_type} limit.
1512
1513Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1514
1515HINT:
1516- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1517- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1518  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1519- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1520
1521DETAILS:
1522- hard limit: {hard_limit}
1523- soft limit: {soft_limit}
1524- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1525            )
1526        };
1527
1528        let limits = self.env().meta_client().get_cluster_limits().await?;
1529        for limit in limits {
1530            match limit {
1531                cluster_limit::ClusterLimit::ActorCount(l) => {
1532                    if l.exceed_hard_limit() {
1533                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1534                            l, true,
1535                        ))));
1536                    } else if l.exceed_soft_limit() {
1537                        self.notice_to_user(gen_message(l, false));
1538                    }
1539                }
1540            }
1541        }
1542        Ok(())
1543    }
1544}
1545
1546pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1547    std::sync::OnceLock::new();
1548
1549pub struct SessionManagerImpl {
1550    env: FrontendEnv,
1551    _join_handles: Vec<JoinHandle<()>>,
1552    _shutdown_senders: Vec<Sender<()>>,
1553    number: AtomicI32,
1554}
1555
1556impl SessionManager for SessionManagerImpl {
1557    type Error = RwError;
1558    type Session = SessionImpl;
1559
1560    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1561        let dummy_addr = Address::Tcp(SocketAddr::new(
1562            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1563            5691, // port of meta
1564        ));
1565
1566        // Always use the built-in super user for dummy sessions.
1567        // This avoids permission checks tied to a specific user_id.
1568        self.connect_inner(
1569            database_id,
1570            risingwave_common::catalog::DEFAULT_SUPER_USER,
1571            Arc::new(dummy_addr),
1572        )
1573    }
1574
1575    fn connect(
1576        &self,
1577        database: &str,
1578        user_name: &str,
1579        peer_addr: AddressRef,
1580    ) -> Result<Arc<Self::Session>> {
1581        let catalog_reader = self.env.catalog_reader();
1582        let reader = catalog_reader.read_guard();
1583        let database_id = reader.get_database_by_name(database)?.id();
1584
1585        self.connect_inner(database_id, user_name, peer_addr)
1586    }
1587
1588    /// Used when cancel request happened.
1589    fn cancel_queries_in_session(&self, session_id: SessionId) {
1590        self.env.cancel_queries_in_session(session_id);
1591    }
1592
1593    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1594        self.env.cancel_creating_jobs_in_session(session_id);
1595    }
1596
1597    fn end_session(&self, session: &Self::Session) {
1598        self.delete_session(&session.session_id());
1599    }
1600
1601    async fn shutdown(&self) {
1602        // Clean up the session map.
1603        self.env.sessions_map().write().clear();
1604        // Unregister from the meta service.
1605        self.env.meta_client().try_unregister().await;
1606    }
1607}
1608
1609impl SessionManagerImpl {
1610    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1611        // TODO(shutdown): only save join handles that **need** to be shutdown
1612        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1613        Ok(Self {
1614            env,
1615            _join_handles: join_handles,
1616            _shutdown_senders: shutdown_senders,
1617            number: AtomicI32::new(0),
1618        })
1619    }
1620
1621    pub(crate) fn env(&self) -> &FrontendEnv {
1622        &self.env
1623    }
1624
1625    fn insert_session(&self, session: Arc<SessionImpl>) {
1626        let active_sessions = {
1627            let mut write_guard = self.env.sessions_map.write();
1628            write_guard.insert(session.id(), session);
1629            write_guard.len()
1630        };
1631        self.env
1632            .frontend_metrics
1633            .active_sessions
1634            .set(active_sessions as i64);
1635    }
1636
1637    fn delete_session(&self, session_id: &SessionId) {
1638        let active_sessions = {
1639            let mut write_guard = self.env.sessions_map.write();
1640            write_guard.remove(session_id);
1641            write_guard.len()
1642        };
1643        self.env
1644            .frontend_metrics
1645            .active_sessions
1646            .set(active_sessions as i64);
1647    }
1648
1649    fn connect_inner(
1650        &self,
1651        database_id: DatabaseId,
1652        user_name: &str,
1653        peer_addr: AddressRef,
1654    ) -> Result<Arc<SessionImpl>> {
1655        let catalog_reader = self.env.catalog_reader();
1656        let reader = catalog_reader.read_guard();
1657        let (database_name, database_owner) = {
1658            let db = reader.get_database_by_id(database_id)?;
1659            (db.name(), db.owner())
1660        };
1661
1662        let user_reader = self.env.user_info_reader();
1663        let reader = user_reader.read_guard();
1664        if let Some(user) = reader.get_user_by_name(user_name) {
1665            if !user.can_login {
1666                bail_permission_denied!("User {} is not allowed to login", user_name);
1667            }
1668            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1669            if !user.is_super && database_owner != user.id && !has_privilege {
1670                bail_permission_denied!("User does not have CONNECT privilege.");
1671            }
1672
1673            // Check HBA configuration for LDAP authentication
1674            let (connection_type, client_addr) = match peer_addr.as_ref() {
1675                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1676                Address::Unix(_) => (ConnectionType::Local, None),
1677            };
1678            tracing::debug!(
1679                "receive connection: type={:?}, client_addr={:?}",
1680                connection_type,
1681                client_addr
1682            );
1683
1684            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1685                &connection_type,
1686                database_name,
1687                user_name,
1688                client_addr,
1689            );
1690
1691            let Some(hba_entry_opt) = hba_entry_opt else {
1692                bail_permission_denied!(
1693                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1694                );
1695            };
1696
1697            // Determine the user authenticator based on the user's auth info.
1698            let authenticator_by_info = || -> Result<UserAuthenticator> {
1699                let authenticator = match &user.auth_info {
1700                    None => UserAuthenticator::None,
1701                    Some(auth_info) => match auth_info.encryption_type() {
1702                        EncryptionType::Plaintext => {
1703                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1704                        }
1705                        EncryptionType::Md5 => {
1706                            let mut salt = [0; 4];
1707                            let mut rng = rand::rng();
1708                            rng.fill_bytes(&mut salt);
1709                            UserAuthenticator::Md5WithSalt {
1710                                encrypted_password: md5_hash_with_salt(
1711                                    &auth_info.encrypted_value,
1712                                    &salt,
1713                                ),
1714                                salt,
1715                            }
1716                        }
1717                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1718                            metadata: auth_info.metadata.clone(),
1719                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1720                        },
1721                        _ => {
1722                            bail_protocol_error!(
1723                                "Unsupported auth type: {}",
1724                                auth_info.encryption_type().as_str_name()
1725                            );
1726                        }
1727                    },
1728                };
1729                Ok(authenticator)
1730            };
1731
1732            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1733                (AuthMethod::Trust, _) => UserAuthenticator::None,
1734                // For backward compatibility, we allow password auth method to work with any auth info.
1735                (AuthMethod::Password, _) => authenticator_by_info()?,
1736                (AuthMethod::Md5, Some(auth_info))
1737                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1738                {
1739                    authenticator_by_info()?
1740                }
1741                (AuthMethod::OAuth, Some(auth_info))
1742                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1743                {
1744                    authenticator_by_info()?
1745                }
1746                (AuthMethod::Ldap, _) => {
1747                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1748                }
1749                _ => {
1750                    bail_permission_denied!(
1751                        "password authentication failed for user \"{user_name}\""
1752                    );
1753                }
1754            };
1755
1756            // Assign a session id and insert into sessions map (for cancel request).
1757            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1758            // Use a trivial strategy: process_id and secret_key are equal.
1759            let id = (secret_key, secret_key);
1760            // Read session params snapshot from frontend env.
1761            let session_config = self.env.session_params_snapshot();
1762
1763            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1764                self.env.clone(),
1765                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1766                user_authenticator,
1767                id,
1768                peer_addr,
1769                session_config,
1770            )
1771            .into();
1772            self.insert_session(session_impl.clone());
1773
1774            Ok(session_impl)
1775        } else {
1776            bail_catalog_error!("Role {} does not exist", user_name);
1777        }
1778    }
1779}
1780
1781impl Session for SessionImpl {
1782    type Error = RwError;
1783    type Portal = Portal;
1784    type PreparedStatement = PrepareStatement;
1785    type ValuesStream = PgResponseStream;
1786
1787    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1788    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1789    async fn run_one_query(
1790        self: Arc<Self>,
1791        stmt: Statement,
1792        format: Format,
1793    ) -> Result<PgResponse<PgResponseStream>> {
1794        let string = stmt.to_string();
1795        let sql_str = string.as_str();
1796        let sql: Arc<str> = Arc::from(sql_str);
1797        // The handle can be slow. Release potential large String early.
1798        drop(string);
1799        let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1800        Ok(rsp)
1801    }
1802
1803    fn user_authenticator(&self) -> &UserAuthenticator {
1804        &self.user_authenticator
1805    }
1806
1807    fn id(&self) -> SessionId {
1808        self.id
1809    }
1810
1811    async fn parse(
1812        self: Arc<Self>,
1813        statement: Option<Statement>,
1814        params_types: Vec<Option<DataType>>,
1815    ) -> Result<PrepareStatement> {
1816        Ok(if let Some(statement) = statement {
1817            handle_parse(self, statement, params_types).await?
1818        } else {
1819            PrepareStatement::Empty
1820        })
1821    }
1822
1823    fn bind(
1824        self: Arc<Self>,
1825        prepare_statement: PrepareStatement,
1826        params: Vec<Option<Bytes>>,
1827        param_formats: Vec<Format>,
1828        result_formats: Vec<Format>,
1829    ) -> Result<Portal> {
1830        handle_bind(prepare_statement, params, param_formats, result_formats)
1831    }
1832
1833    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1834        let rsp = Box::pin(handle_execute(self, portal)).await?;
1835        Ok(rsp)
1836    }
1837
1838    fn describe_statement(
1839        self: Arc<Self>,
1840        prepare_statement: PrepareStatement,
1841    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1842        Ok(match prepare_statement {
1843            PrepareStatement::Empty => (vec![], vec![]),
1844            PrepareStatement::Prepared(prepare_statement) => (
1845                prepare_statement.bound_result.param_types,
1846                infer(
1847                    Some(prepare_statement.bound_result.bound),
1848                    prepare_statement.statement,
1849                )?,
1850            ),
1851            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1852        })
1853    }
1854
1855    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1856        match portal {
1857            Portal::Empty => Ok(vec![]),
1858            Portal::Portal(portal) => {
1859                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1860                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1861                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1862                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1863                    if f == Format::Binary {
1864                        c.set_to_binary()
1865                    }
1866                });
1867                Ok(columns)
1868            }
1869            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1870        }
1871    }
1872
1873    fn get_config(&self, key: &str) -> Result<String> {
1874        self.config().get(key).map_err(Into::into)
1875    }
1876
1877    fn set_config(&self, key: &str, value: String) -> Result<String> {
1878        Self::set_config(self, key, value)
1879    }
1880
1881    async fn next_notice(self: &Arc<Self>) -> String {
1882        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1883            .await
1884            .expect("notice channel should not be closed")
1885    }
1886
1887    fn transaction_status(&self) -> TransactionStatus {
1888        match &*self.txn.lock() {
1889            transaction::State::Initial | transaction::State::Implicit(_) => {
1890                TransactionStatus::Idle
1891            }
1892            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1893            // TODO: failed transaction
1894        }
1895    }
1896
1897    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1898    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1899        let exec_context = Arc::new(ExecContext {
1900            running_sql: sql,
1901            last_instant: Instant::now(),
1902            last_idle_instant: self.last_idle_instant.clone(),
1903        });
1904        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1905        // unset idle state, since there is a sql running
1906        *self.last_idle_instant.lock() = None;
1907        ExecContextGuard::new(exec_context)
1908    }
1909
1910    /// Check whether idle transaction timeout.
1911    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1912    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1913        // In transaction.
1914        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1915            let idle_in_transaction_session_timeout =
1916                self.config().idle_in_transaction_session_timeout() as u128;
1917            // Idle transaction timeout has been enabled.
1918            if idle_in_transaction_session_timeout != 0 {
1919                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1920                let guard = self.exec_context.lock();
1921                // No running sql i.e. idle
1922                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1923                    // Idle timeout.
1924                    if let Some(elapse_since_last_idle_instant) =
1925                        self.elapse_since_last_idle_instant()
1926                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1927                    {
1928                        return Err(PsqlError::IdleInTxnTimeout);
1929                    }
1930                }
1931            }
1932        }
1933        Ok(())
1934    }
1935
1936    fn user(&self) -> String {
1937        self.user_name()
1938    }
1939}
1940
1941/// Returns row description of the statement
1942fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1943    match stmt {
1944        Statement::Query(_)
1945        | Statement::Insert { .. }
1946        | Statement::Delete { .. }
1947        | Statement::Update { .. }
1948        | Statement::FetchCursor { .. } => Ok(bound
1949            .unwrap()
1950            .output_fields()
1951            .iter()
1952            .map(to_pg_field)
1953            .collect()),
1954        Statement::ShowObjects {
1955            object: show_object,
1956            ..
1957        } => Ok(infer_show_object(&show_object)),
1958        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1959        Statement::ShowTransactionIsolationLevel => {
1960            let name = "transaction_isolation";
1961            Ok(infer_show_variable(name))
1962        }
1963        Statement::ShowVariable { variable } => {
1964            let name = &variable[0].real_value().to_lowercase();
1965            Ok(infer_show_variable(name))
1966        }
1967        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1968        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1969            "QUERY PLAN".to_owned(),
1970            DataType::Varchar.to_oid(),
1971            DataType::Varchar.type_len(),
1972        )]),
1973        _ => Ok(vec![]),
1974    }
1975}
1976
1977pub struct WorkerProcessId {
1978    pub worker_id: WorkerId,
1979    pub process_id: i32,
1980}
1981
1982impl WorkerProcessId {
1983    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1984        Self {
1985            worker_id,
1986            process_id,
1987        }
1988    }
1989}
1990
1991impl Display for WorkerProcessId {
1992    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1993        write!(f, "{}:{}", self.worker_id, self.process_id)
1994    }
1995}
1996
1997impl TryFrom<String> for WorkerProcessId {
1998    type Error = String;
1999
2000    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2001        const INVALID: &str = "invalid WorkerProcessId";
2002        let splits: Vec<&str> = worker_process_id.split(":").collect();
2003        if splits.len() != 2 {
2004            return Err(INVALID.to_owned());
2005        }
2006        let Ok(worker_id) = splits[0].parse::<u32>() else {
2007            return Err(INVALID.to_owned());
2008        };
2009        let Ok(process_id) = splits[1].parse::<i32>() else {
2010            return Err(INVALID.to_owned());
2011        };
2012        Ok(WorkerProcessId::new(worker_id.into(), process_id))
2013    }
2014}
2015
2016pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2017    let guard = sessions_map.read();
2018    if let Some(session) = guard.get(&session_id) {
2019        session.cancel_current_query();
2020        true
2021    } else {
2022        info!("Current session finished, ignoring cancel query request");
2023        false
2024    }
2025}
2026
2027pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2028    let guard = sessions_map.read();
2029    if let Some(session) = guard.get(&session_id) {
2030        session.cancel_current_creating_job();
2031        true
2032    } else {
2033        info!("Current session finished, ignoring cancel creating request");
2034        false
2035    }
2036}