risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36    UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45    WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61    LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85    MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108    check_schema_writable,
109};
110use crate::error::{
111    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130    HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143/// The global environment for the frontend server.
144#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146    // Different session may access catalog at the same time and catalog is protected by a
147    // RwLock.
148    meta_client: Arc<dyn FrontendMetaClient>,
149    catalog_writer: Arc<dyn CatalogWriter>,
150    catalog_reader: CatalogReader,
151    user_info_writer: Arc<dyn UserInfoWriter>,
152    user_info_reader: UserInfoReader,
153    worker_node_manager: WorkerNodeManagerRef,
154    query_manager: QueryManager,
155    hummock_snapshot_manager: HummockSnapshotManagerRef,
156    system_params_manager: LocalSystemParamsManagerRef,
157    session_params: Arc<RwLock<SessionConfig>>,
158
159    server_addr: HostAddr,
160    client_pool: ComputeClientPoolRef,
161    frontend_client_pool: FrontendClientPoolRef,
162    monitor_client_pool: MonitorClientPoolRef,
163
164    /// Each session is identified by (`process_id`,
165    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
166    /// running queries.
167    sessions_map: SessionMapRef,
168
169    pub frontend_metrics: Arc<FrontendMetrics>,
170
171    pub cursor_metrics: Arc<CursorMetrics>,
172
173    source_metrics: Arc<SourceMetrics>,
174
175    /// Batch spill metrics
176    spill_metrics: Arc<BatchSpillMetrics>,
177
178    batch_config: BatchConfig,
179    frontend_config: FrontendConfig,
180    #[expect(dead_code)]
181    meta_config: MetaConfig,
182    streaming_config: StreamingConfig,
183    udf_config: UdfConfig,
184
185    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
186    /// received.
187    creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
190    /// root stage in mpp mode.
191    compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193    /// Memory context used for batch executors in frontend.
194    mem_context: MemoryContext,
195
196    #[cfg(feature = "datafusion")]
197    /// Shared budget context for **DataFusion** spillable operators.
198    /// Created by [`crate::datafusion::execute::memory_ctx::create_df_spillable_budget_ctx`];
199    /// caps total spillable usage so unspillable operators always have headroom.
200    /// Not used by the RisingWave batch engine.
201    df_spillable_budget_ctx: MemoryContext,
202
203    /// address of the serverless backfill controller.
204    serverless_backfill_controller_addr: String,
205
206    /// Prometheus client for querying metrics.
207    prometheus_client: Option<PrometheusClient>,
208
209    /// The additional selector used when querying Prometheus.
210    prometheus_selector: String,
211}
212
213/// Session map identified by `(process_id, secret_key)`
214pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
215
216/// The proportion of frontend memory used for batch processing.
217const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
218
219impl FrontendEnv {
220    pub fn mock() -> Self {
221        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
222
223        let catalog = Arc::new(RwLock::new(Catalog::default()));
224        let meta_client = Arc::new(MockFrontendMetaClient {});
225        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
226        let catalog_writer = Arc::new(MockCatalogWriter::new(
227            catalog.clone(),
228            hummock_snapshot_manager.clone(),
229        ));
230        let catalog_reader = CatalogReader::new(catalog);
231        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
232        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
233        let user_info_reader = UserInfoReader::new(user_info_manager);
234        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
235        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
236        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
237        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
238        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
239        let query_manager = QueryManager::new(
240            worker_node_manager.clone(),
241            compute_client_pool,
242            catalog_reader.clone(),
243            Arc::new(DistributedQueryMetrics::for_test()),
244            None,
245            None,
246        );
247        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
248        let client_pool = Arc::new(ComputeClientPool::for_test());
249        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
250        let runtime = {
251            let mut builder = Builder::new_multi_thread();
252            if let Some(frontend_compute_runtime_worker_threads) =
253                load_config("", FrontendOpts::default())
254                    .batch
255                    .frontend_compute_runtime_worker_threads
256            {
257                builder.worker_threads(frontend_compute_runtime_worker_threads);
258            }
259            builder
260                .thread_name("rw-batch-local")
261                .enable_all()
262                .build()
263                .unwrap()
264        };
265        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
266        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
267        Self {
268            meta_client,
269            catalog_writer,
270            catalog_reader,
271            user_info_writer,
272            user_info_reader,
273            worker_node_manager,
274            query_manager,
275            hummock_snapshot_manager,
276            system_params_manager,
277            session_params: Default::default(),
278            server_addr,
279            client_pool,
280            frontend_client_pool,
281            monitor_client_pool,
282            sessions_map,
283            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
284            cursor_metrics: Arc::new(CursorMetrics::for_test()),
285            batch_config: BatchConfig::default(),
286            frontend_config: FrontendConfig::default(),
287            meta_config: MetaConfig::default(),
288            streaming_config: StreamingConfig::default(),
289            udf_config: UdfConfig::default(),
290            source_metrics: Arc::new(SourceMetrics::default()),
291            spill_metrics: BatchSpillMetrics::for_test(),
292            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
293            compute_runtime,
294            mem_context: MemoryContext::none(),
295            #[cfg(feature = "datafusion")]
296            df_spillable_budget_ctx: MemoryContext::none(),
297            serverless_backfill_controller_addr: Default::default(),
298            prometheus_client: None,
299            prometheus_selector: String::new(),
300        }
301    }
302
303    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
304        let config = load_config(&opts.config_path, &opts);
305        info!("Starting frontend node");
306        info!("> config: {:?}", config);
307        info!(
308            "> debug assertions: {}",
309            if cfg!(debug_assertions) { "on" } else { "off" }
310        );
311        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
312
313        let frontend_address: HostAddr = opts
314            .advertise_addr
315            .as_ref()
316            .unwrap_or_else(|| {
317                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
318                &opts.listen_addr
319            })
320            .parse()
321            .unwrap();
322        info!("advertise addr is {}", frontend_address);
323
324        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
325        let internal_rpc_host_addr = HostAddr {
326            // Use the host of advertise address for the frontend rpc address.
327            host: frontend_address.host.clone(),
328            port: rpc_addr.port,
329        };
330        // Register in meta by calling `AddWorkerNode` RPC.
331        let (meta_client, system_params_reader) = MetaClient::register_new(
332            opts.meta_addr,
333            WorkerType::Frontend,
334            &frontend_address,
335            AddWorkerNodeProperty {
336                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
337                ..Default::default()
338            },
339            Arc::new(config.meta.clone()),
340        )
341        .await;
342
343        let worker_id = meta_client.worker_id();
344        info!("Assigned worker node id {}", worker_id);
345
346        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
347            meta_client.clone(),
348            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
349        );
350        let mut join_handles = vec![heartbeat_join_handle];
351        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
352
353        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
354        let hummock_snapshot_manager =
355            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
356
357        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
358        let catalog = Arc::new(RwLock::new(Catalog::default()));
359        let catalog_writer = Arc::new(CatalogWriterImpl::new(
360            meta_client.clone(),
361            catalog_updated_rx.clone(),
362            hummock_snapshot_manager.clone(),
363        ));
364        let catalog_reader = CatalogReader::new(catalog.clone());
365
366        let worker_node_manager = Arc::new(WorkerNodeManager::new());
367
368        let compute_client_pool = Arc::new(ComputeClientPool::new(
369            config.batch_exchange_connection_pool_size(),
370            config.batch.developer.compute_client_config.clone(),
371        ));
372        let frontend_client_pool = Arc::new(FrontendClientPool::new(
373            1,
374            config.batch.developer.frontend_client_config.clone(),
375        ));
376        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
377        let query_manager = QueryManager::new(
378            worker_node_manager.clone(),
379            compute_client_pool.clone(),
380            catalog_reader.clone(),
381            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
382            config.batch.distributed_query_limit,
383            config.batch.max_batch_queries_per_frontend_node,
384        );
385
386        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
387        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
388        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
389            meta_client.clone(),
390            catalog_updated_rx,
391        ));
392
393        let system_params_manager =
394            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
395
396        LocalSecretManager::init(
397            opts.temp_secret_file_dir,
398            meta_client.cluster_id().to_owned(),
399            worker_id,
400        );
401
402        // This `session_params` should be initialized during the initial notification in `observer_manager`
403        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
404        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
405        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
406
407        let frontend_observer_node = FrontendObserverNode::new(
408            worker_node_manager.clone(),
409            catalog,
410            catalog_updated_tx,
411            user_info_manager,
412            hummock_snapshot_manager.clone(),
413            system_params_manager.clone(),
414            session_params.clone(),
415            compute_client_pool.clone(),
416        );
417        let observer_manager =
418            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
419                .await;
420        let observer_join_handle = observer_manager.start().await;
421        join_handles.push(observer_join_handle);
422
423        meta_client.activate(&frontend_address).await?;
424
425        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
426        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
427        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
428
429        if config.server.metrics_level > MetricLevel::Disabled {
430            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
431        }
432
433        let health_srv = HealthServiceImpl::new();
434        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
435        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
436        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
437
438        let telemetry_manager = TelemetryManager::new(
439            Arc::new(meta_client.clone()),
440            Arc::new(FrontendTelemetryCreator::new()),
441        );
442
443        // if the toml config file or env variable disables telemetry, do not watch system params
444        // change because if any of configs disable telemetry, we should never start it
445        if config.server.telemetry_enabled && telemetry_env_enabled() {
446            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
447            join_handles.push(join_handle);
448            shutdown_senders.push(shutdown_sender);
449        } else {
450            tracing::info!("Telemetry didn't start due to config");
451        }
452
453        tokio::spawn(async move {
454            tonic::transport::Server::builder()
455                .add_service(HealthServer::new(health_srv))
456                .add_service(FrontendServiceServer::new(frontend_srv))
457                .add_service(configured_monitor_service_server(
458                    MonitorServiceServer::new(monitor_srv),
459                ))
460                .serve(frontend_rpc_addr)
461                .await
462                .unwrap();
463        });
464        info!(
465            "Health Check RPC Listener is set up on {}",
466            opts.frontend_rpc_listener_addr.clone()
467        );
468
469        let creating_streaming_job_tracker =
470            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
471
472        let runtime = {
473            let mut builder = Builder::new_multi_thread();
474            if let Some(frontend_compute_runtime_worker_threads) =
475                config.batch.frontend_compute_runtime_worker_threads
476            {
477                builder.worker_threads(frontend_compute_runtime_worker_threads);
478            }
479            builder
480                .thread_name("rw-batch-local")
481                .enable_all()
482                .build()
483                .unwrap()
484        };
485        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
486
487        let sessions = sessions_map.clone();
488        // Idle transaction background monitor
489        let join_handle = tokio::spawn(async move {
490            let mut check_idle_txn_interval =
491                tokio::time::interval(core::time::Duration::from_secs(10));
492            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
493            check_idle_txn_interval.reset();
494            loop {
495                check_idle_txn_interval.tick().await;
496                sessions.read().values().for_each(|session| {
497                    let _ = session.check_idle_in_transaction_timeout();
498                })
499            }
500        });
501        join_handles.push(join_handle);
502
503        // Clean up the spill directory.
504        #[cfg(not(madsim))]
505        if config.batch.enable_spill {
506            SpillOp::clean_spill_directory()
507                .await
508                .map_err(|err| anyhow!(err))?;
509        }
510
511        let total_memory_bytes = opts.frontend_total_memory_bytes;
512        let heap_profiler =
513            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
514        // Run a background heap profiler
515        heap_profiler.start();
516
517        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
518        let mem_context = MemoryContext::root(
519            frontend_metrics.batch_total_mem.clone(),
520            batch_memory_limit as u64,
521        );
522        #[cfg(feature = "datafusion")]
523        let df_spillable_budget_ctx =
524            crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
525
526        // Initialize Prometheus client if endpoint is provided
527        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
528            match PrometheusClient::try_from(endpoint.as_str()) {
529                Ok(client) => {
530                    info!("Prometheus client initialized with endpoint: {}", endpoint);
531                    Some(client)
532                }
533                Err(e) => {
534                    error!(
535                        error = %e.as_report(),
536                        "failed to initialize Prometheus client",
537                    );
538                    None
539                }
540            }
541        } else {
542            None
543        };
544
545        // Initialize Prometheus selector
546        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
547
548        info!(
549            "Frontend  total_memory: {} batch_memory: {}",
550            convert(total_memory_bytes as _),
551            convert(batch_memory_limit as _),
552        );
553
554        Ok((
555            Self {
556                catalog_reader,
557                catalog_writer,
558                user_info_reader,
559                user_info_writer,
560                worker_node_manager,
561                meta_client: frontend_meta_client,
562                query_manager,
563                hummock_snapshot_manager,
564                system_params_manager,
565                session_params,
566                server_addr: frontend_address,
567                client_pool: compute_client_pool,
568                frontend_client_pool,
569                monitor_client_pool,
570                frontend_metrics,
571                cursor_metrics,
572                spill_metrics,
573                sessions_map,
574                batch_config: config.batch,
575                frontend_config: config.frontend,
576                meta_config: config.meta,
577                streaming_config: config.streaming,
578                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
579                udf_config: config.udf,
580                source_metrics,
581                creating_streaming_job_tracker,
582                compute_runtime,
583                mem_context,
584                #[cfg(feature = "datafusion")]
585                df_spillable_budget_ctx,
586                prometheus_client,
587                prometheus_selector,
588            },
589            join_handles,
590            shutdown_senders,
591        ))
592    }
593
594    /// Get a reference to the frontend env's catalog writer.
595    ///
596    /// This method is intentionally private, and a write guard is required for the caller to
597    /// prove that the write operations are permitted in the current transaction.
598    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
599        &*self.catalog_writer
600    }
601
602    /// Get a reference to the frontend env's catalog reader.
603    pub fn catalog_reader(&self) -> &CatalogReader {
604        &self.catalog_reader
605    }
606
607    /// Get a reference to the frontend env's user info writer.
608    ///
609    /// This method is intentionally private, and a write guard is required for the caller to
610    /// prove that the write operations are permitted in the current transaction.
611    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
612        &*self.user_info_writer
613    }
614
615    /// Get a reference to the frontend env's user info reader.
616    pub fn user_info_reader(&self) -> &UserInfoReader {
617        &self.user_info_reader
618    }
619
620    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
621        self.worker_node_manager.clone()
622    }
623
624    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
625        &*self.meta_client
626    }
627
628    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
629        self.meta_client.clone()
630    }
631
632    pub fn query_manager(&self) -> &QueryManager {
633        &self.query_manager
634    }
635
636    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
637        &self.hummock_snapshot_manager
638    }
639
640    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
641        &self.system_params_manager
642    }
643
644    pub fn session_params_snapshot(&self) -> SessionConfig {
645        self.session_params.read_recursive().clone()
646    }
647
648    pub fn sbc_address(&self) -> &String {
649        &self.serverless_backfill_controller_addr
650    }
651
652    /// Get a reference to the Prometheus client if available.
653    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
654        self.prometheus_client.as_ref()
655    }
656
657    /// Get the Prometheus selector string.
658    pub fn prometheus_selector(&self) -> &str {
659        &self.prometheus_selector
660    }
661
662    pub fn server_address(&self) -> &HostAddr {
663        &self.server_addr
664    }
665
666    pub fn client_pool(&self) -> ComputeClientPoolRef {
667        self.client_pool.clone()
668    }
669
670    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
671        self.frontend_client_pool.clone()
672    }
673
674    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
675        self.monitor_client_pool.clone()
676    }
677
678    pub fn batch_config(&self) -> &BatchConfig {
679        &self.batch_config
680    }
681
682    pub fn frontend_config(&self) -> &FrontendConfig {
683        &self.frontend_config
684    }
685
686    pub fn streaming_config(&self) -> &StreamingConfig {
687        &self.streaming_config
688    }
689
690    pub fn udf_config(&self) -> &UdfConfig {
691        &self.udf_config
692    }
693
694    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
695        self.source_metrics.clone()
696    }
697
698    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
699        self.spill_metrics.clone()
700    }
701
702    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
703        &self.creating_streaming_job_tracker
704    }
705
706    pub fn sessions_map(&self) -> &SessionMapRef {
707        &self.sessions_map
708    }
709
710    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
711        self.compute_runtime.clone()
712    }
713
714    /// Cancel queries (i.e. batch queries) in session.
715    /// If the session exists return true, otherwise, return false.
716    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
717        cancel_queries_in_session(session_id, self.sessions_map.clone())
718    }
719
720    /// Cancel creating jobs (i.e. streaming queries) in session.
721    /// If the session exists return true, otherwise, return false.
722    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
723        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
724    }
725
726    pub fn mem_context(&self) -> MemoryContext {
727        self.mem_context.clone()
728    }
729
730    #[cfg(feature = "datafusion")]
731    pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
732        self.df_spillable_budget_ctx.clone()
733    }
734}
735
736#[derive(Clone)]
737pub struct AuthContext {
738    pub database: String,
739    pub user_name: String,
740    pub user_id: UserId,
741}
742
743impl AuthContext {
744    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
745        Self {
746            database,
747            user_name,
748            user_id,
749        }
750    }
751}
752pub struct SessionImpl {
753    env: FrontendEnv,
754    auth_context: Arc<RwLock<AuthContext>>,
755    /// Used for user authentication.
756    user_authenticator: UserAuthenticator,
757    /// Stores the value of configurations.
758    config_map: Arc<RwLock<SessionConfig>>,
759
760    /// Channel sender for frontend handler to send notices.
761    notice_tx: UnboundedSender<String>,
762    /// Channel receiver for pgwire to take notices and send to clients.
763    notice_rx: Mutex<UnboundedReceiver<String>>,
764
765    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
766    id: (i32, i32),
767
768    /// Client address
769    peer_addr: AddressRef,
770
771    /// Transaction state.
772    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
773    /// async functions, there should actually be no contention.
774    txn: Arc<Mutex<transaction::State>>,
775
776    /// Query cancel flag.
777    /// This flag is set only when current query is executed in local mode, and used to cancel
778    /// local query.
779    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
780
781    /// execution context represents the lifetime of a running SQL in the current session
782    exec_context: Mutex<Option<Weak<ExecContext>>>,
783
784    /// Last idle instant
785    last_idle_instant: Arc<Mutex<Option<Instant>>>,
786
787    cursor_manager: Arc<CursorManager>,
788
789    /// temporary sources for the current session
790    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
791
792    /// staging catalogs for the current session
793    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
794}
795
796/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
797/// Temporary sources are automatically dropped at the end of a session
798/// Temporary sources are expected to be selected by batch queries, not streaming queries.
799/// Temporary sources currently are only used by cloud portal to preview the data during table and
800/// source creation, so it is a internal feature and not exposed to users.
801/// The current PR supports temporary source with minimum effort,
802/// so we don't care about the database name and schema name, but only care about the source name.
803/// Temporary sources can only be shown via `show sources` command but not other system tables.
804#[derive(Default, Clone)]
805pub struct TemporarySourceManager {
806    sources: HashMap<String, SourceCatalog>,
807}
808
809impl TemporarySourceManager {
810    pub fn new() -> Self {
811        Self {
812            sources: HashMap::new(),
813        }
814    }
815
816    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
817        self.sources.insert(name, source);
818    }
819
820    pub fn drop_source(&mut self, name: &str) {
821        self.sources.remove(name);
822    }
823
824    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
825        self.sources.get(name)
826    }
827
828    pub fn keys(&self) -> Vec<String> {
829        self.sources.keys().cloned().collect()
830    }
831}
832
833/// Staging catalog manager is used to manage the tables creating in the current session.
834#[derive(Default, Clone)]
835pub struct StagingCatalogManager {
836    // staging tables creating in the current session.
837    tables: HashMap<String, TableCatalog>,
838}
839
840impl StagingCatalogManager {
841    pub fn new() -> Self {
842        Self {
843            tables: HashMap::new(),
844        }
845    }
846
847    pub fn create_table(&mut self, name: String, table: TableCatalog) {
848        self.tables.insert(name, table);
849    }
850
851    pub fn drop_table(&mut self, name: &str) {
852        self.tables.remove(name);
853    }
854
855    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
856        self.tables.get(name)
857    }
858}
859
860#[derive(Error, Debug)]
861pub enum CheckRelationError {
862    #[error("{0}")]
863    Resolve(#[from] ResolveQualifiedNameError),
864    #[error("{0}")]
865    Catalog(#[from] CatalogError),
866}
867
868impl From<CheckRelationError> for RwError {
869    fn from(e: CheckRelationError) -> Self {
870        match e {
871            CheckRelationError::Resolve(e) => e.into(),
872            CheckRelationError::Catalog(e) => e.into(),
873        }
874    }
875}
876
877impl SessionImpl {
878    pub(crate) fn new(
879        env: FrontendEnv,
880        auth_context: AuthContext,
881        user_authenticator: UserAuthenticator,
882        id: SessionId,
883        peer_addr: AddressRef,
884        session_config: SessionConfig,
885    ) -> Self {
886        let cursor_metrics = env.cursor_metrics.clone();
887        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
888
889        Self {
890            env,
891            auth_context: Arc::new(RwLock::new(auth_context)),
892            user_authenticator,
893            config_map: Arc::new(RwLock::new(session_config)),
894            id,
895            peer_addr,
896            txn: Default::default(),
897            current_query_cancel_flag: Mutex::new(None),
898            notice_tx,
899            notice_rx: Mutex::new(notice_rx),
900            exec_context: Mutex::new(None),
901            last_idle_instant: Default::default(),
902            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
903            temporary_source_manager: Default::default(),
904            staging_catalog_manager: Default::default(),
905        }
906    }
907
908    #[cfg(test)]
909    pub fn mock() -> Self {
910        let env = FrontendEnv::mock();
911        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
912
913        Self {
914            env: FrontendEnv::mock(),
915            auth_context: Arc::new(RwLock::new(AuthContext::new(
916                DEFAULT_DATABASE_NAME.to_owned(),
917                DEFAULT_SUPER_USER.to_owned(),
918                DEFAULT_SUPER_USER_ID,
919            ))),
920            user_authenticator: UserAuthenticator::None,
921            config_map: Default::default(),
922            // Mock session use non-sense id.
923            id: (0, 0),
924            txn: Default::default(),
925            current_query_cancel_flag: Mutex::new(None),
926            notice_tx,
927            notice_rx: Mutex::new(notice_rx),
928            exec_context: Mutex::new(None),
929            peer_addr: Address::Tcp(SocketAddr::new(
930                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
931                8080,
932            ))
933            .into(),
934            last_idle_instant: Default::default(),
935            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
936            temporary_source_manager: Default::default(),
937            staging_catalog_manager: Default::default(),
938        }
939    }
940
941    pub(crate) fn env(&self) -> &FrontendEnv {
942        &self.env
943    }
944
945    pub fn auth_context(&self) -> Arc<AuthContext> {
946        let ctx = self.auth_context.read();
947        Arc::new(ctx.clone())
948    }
949
950    pub fn database(&self) -> String {
951        self.auth_context.read().database.clone()
952    }
953
954    pub fn database_id(&self) -> DatabaseId {
955        let db_name = self.database();
956        self.env
957            .catalog_reader()
958            .read_guard()
959            .get_database_by_name(&db_name)
960            .map(|db| db.id())
961            .expect("session database not found")
962    }
963
964    pub fn user_name(&self) -> String {
965        self.auth_context.read().user_name.clone()
966    }
967
968    pub fn user_id(&self) -> UserId {
969        self.auth_context.read().user_id
970    }
971
972    pub fn update_database(&self, database: String) {
973        self.auth_context.write().database = database;
974    }
975
976    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
977        Arc::clone(&self.config_map)
978    }
979
980    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
981        self.config_map.read()
982    }
983
984    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
985        self.config_map
986            .write()
987            .set(key, value, &mut ())
988            .map_err(Into::into)
989    }
990
991    pub fn reset_config(&self, key: &str) -> Result<String> {
992        self.config_map
993            .write()
994            .reset(key, &mut ())
995            .map_err(Into::into)
996    }
997
998    pub fn set_config_report(
999        &self,
1000        key: &str,
1001        value: Option<String>,
1002        mut reporter: impl ConfigReporter,
1003    ) -> Result<String> {
1004        if let Some(value) = value {
1005            self.config_map
1006                .write()
1007                .set(key, value, &mut reporter)
1008                .map_err(Into::into)
1009        } else {
1010            self.config_map
1011                .write()
1012                .reset(key, &mut reporter)
1013                .map_err(Into::into)
1014        }
1015    }
1016
1017    pub fn session_id(&self) -> SessionId {
1018        self.id
1019    }
1020
1021    pub fn running_sql(&self) -> Option<Arc<str>> {
1022        self.exec_context
1023            .lock()
1024            .as_ref()
1025            .and_then(|weak| weak.upgrade())
1026            .map(|context| context.running_sql.clone())
1027    }
1028
1029    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1030        self.cursor_manager.clone()
1031    }
1032
1033    pub fn peer_addr(&self) -> &Address {
1034        &self.peer_addr
1035    }
1036
1037    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1038        self.exec_context
1039            .lock()
1040            .as_ref()
1041            .and_then(|weak| weak.upgrade())
1042            .map(|context| context.last_instant.elapsed().as_millis())
1043    }
1044
1045    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1046        self.last_idle_instant
1047            .lock()
1048            .as_ref()
1049            .map(|x| x.elapsed().as_millis())
1050    }
1051
1052    pub fn check_relation_name_duplicated(
1053        &self,
1054        name: ObjectName,
1055        stmt_type: StatementType,
1056        if_not_exists: bool,
1057    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1058        let db_name = &self.database();
1059        let catalog_reader = self.env().catalog_reader().read_guard();
1060        let (schema_name, relation_name) = {
1061            let (schema_name, relation_name) =
1062                Binder::resolve_schema_qualified_name(db_name, &name)?;
1063            let search_path = self.config().search_path();
1064            let user_name = &self.user_name();
1065            let schema_name = match schema_name {
1066                Some(schema_name) => schema_name,
1067                None => catalog_reader
1068                    .first_valid_schema(db_name, &search_path, user_name)?
1069                    .name(),
1070            };
1071            (schema_name, relation_name)
1072        };
1073        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1074            Err(e) if if_not_exists => {
1075                if let CatalogErrorInner::Duplicated {
1076                    name,
1077                    under_creation,
1078                    ..
1079                } = e.inner()
1080                {
1081                    // If relation is created, return directly.
1082                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1083                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1084                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1085                    if !*under_creation {
1086                        Ok(Either::Right(
1087                            PgResponse::builder(stmt_type)
1088                                .notice(format!("relation \"{}\" already exists, skipping", name))
1089                                .into(),
1090                        ))
1091                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1092                        // For now, when a Subscription is creating, we return directly with an additional message.
1093                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1094                        Ok(Either::Right(
1095                            PgResponse::builder(stmt_type)
1096                                .notice(format!(
1097                                    "relation \"{}\" already exists but still creating, skipping",
1098                                    name
1099                                ))
1100                                .into(),
1101                        ))
1102                    } else {
1103                        Ok(Either::Left(()))
1104                    }
1105                } else {
1106                    Err(e.into())
1107                }
1108            }
1109            Err(e) => Err(e.into()),
1110            Ok(_) => Ok(Either::Left(())),
1111        }
1112    }
1113
1114    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1115        let db_name = &self.database();
1116        let catalog_reader = self.env().catalog_reader().read_guard();
1117        let (schema_name, secret_name) = {
1118            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1119            let search_path = self.config().search_path();
1120            let user_name = &self.user_name();
1121            let schema_name = match schema_name {
1122                Some(schema_name) => schema_name,
1123                None => catalog_reader
1124                    .first_valid_schema(db_name, &search_path, user_name)?
1125                    .name(),
1126            };
1127            (schema_name, secret_name)
1128        };
1129        catalog_reader
1130            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1131            .map_err(RwError::from)
1132    }
1133
1134    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1135        let db_name = &self.database();
1136        let catalog_reader = self.env().catalog_reader().read_guard();
1137        let (schema_name, connection_name) = {
1138            let (schema_name, connection_name) =
1139                Binder::resolve_schema_qualified_name(db_name, &name)?;
1140            let search_path = self.config().search_path();
1141            let user_name = &self.user_name();
1142            let schema_name = match schema_name {
1143                Some(schema_name) => schema_name,
1144                None => catalog_reader
1145                    .first_valid_schema(db_name, &search_path, user_name)?
1146                    .name(),
1147            };
1148            (schema_name, connection_name)
1149        };
1150        catalog_reader
1151            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1152            .map_err(RwError::from)
1153    }
1154
1155    pub fn check_function_name_duplicated(
1156        &self,
1157        stmt_type: StatementType,
1158        name: ObjectName,
1159        arg_types: &[DataType],
1160        if_not_exists: bool,
1161    ) -> Result<Either<(), RwPgResponse>> {
1162        let db_name = &self.database();
1163        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1164        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1165
1166        let catalog_reader = self.env().catalog_reader().read_guard();
1167        if catalog_reader
1168            .get_schema_by_id(database_id, schema_id)?
1169            .get_function_by_name_args(&function_name, arg_types)
1170            .is_some()
1171        {
1172            let full_name = format!(
1173                "{function_name}({})",
1174                arg_types.iter().map(|t| t.to_string()).join(",")
1175            );
1176            if if_not_exists {
1177                Ok(Either::Right(
1178                    PgResponse::builder(stmt_type)
1179                        .notice(format!(
1180                            "function \"{}\" already exists, skipping",
1181                            full_name
1182                        ))
1183                        .into(),
1184                ))
1185            } else {
1186                Err(CatalogError::duplicated("function", full_name).into())
1187            }
1188        } else {
1189            Ok(Either::Left(()))
1190        }
1191    }
1192
1193    /// Also check if the user has the privilege to create in the schema.
1194    pub fn get_database_and_schema_id_for_create(
1195        &self,
1196        schema_name: Option<String>,
1197    ) -> Result<(DatabaseId, SchemaId)> {
1198        let db_name = &self.database();
1199
1200        let search_path = self.config().search_path();
1201        let user_name = &self.user_name();
1202
1203        let catalog_reader = self.env().catalog_reader().read_guard();
1204        let schema = match schema_name {
1205            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1206            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1207        };
1208        let schema_name = schema.name();
1209
1210        check_schema_writable(&schema_name)?;
1211        self.check_privileges(&[ObjectCheckItem::new(
1212            schema.owner(),
1213            AclMode::Create,
1214            schema_name,
1215            schema.id(),
1216        )])?;
1217
1218        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1219        Ok((db_id, schema.id()))
1220    }
1221
1222    pub fn get_connection_by_name(
1223        &self,
1224        schema_name: Option<String>,
1225        connection_name: &str,
1226    ) -> Result<Arc<ConnectionCatalog>> {
1227        let db_name = &self.database();
1228        let search_path = self.config().search_path();
1229        let user_name = &self.user_name();
1230
1231        let catalog_reader = self.env().catalog_reader().read_guard();
1232        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1233        let (connection, _) =
1234            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1235
1236        self.check_privileges(&[ObjectCheckItem::new(
1237            connection.owner(),
1238            AclMode::Usage,
1239            connection.name.clone(),
1240            connection.id,
1241        )])?;
1242
1243        Ok(connection.clone())
1244    }
1245
1246    pub fn get_subscription_by_schema_id_name(
1247        &self,
1248        schema_id: SchemaId,
1249        subscription_name: &str,
1250    ) -> Result<Arc<SubscriptionCatalog>> {
1251        let db_name = &self.database();
1252
1253        let catalog_reader = self.env().catalog_reader().read_guard();
1254        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1255        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1256        let subscription = schema
1257            .get_subscription_by_name(subscription_name)
1258            .ok_or_else(|| {
1259                RwError::from(ErrorCode::ItemNotFound(format!(
1260                    "subscription {} not found",
1261                    subscription_name
1262                )))
1263            })?;
1264        Ok(subscription.clone())
1265    }
1266
1267    pub fn get_subscription_by_name(
1268        &self,
1269        schema_name: Option<String>,
1270        subscription_name: &str,
1271    ) -> Result<Arc<SubscriptionCatalog>> {
1272        let db_name = &self.database();
1273        let search_path = self.config().search_path();
1274        let user_name = &self.user_name();
1275
1276        let catalog_reader = self.env().catalog_reader().read_guard();
1277        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1278        let (subscription, _) =
1279            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1280        Ok(subscription.clone())
1281    }
1282
1283    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1284        let catalog_reader = self.env().catalog_reader().read_guard();
1285        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1286    }
1287
1288    pub fn get_table_by_name(
1289        &self,
1290        table_name: &str,
1291        db_id: DatabaseId,
1292        schema_id: SchemaId,
1293    ) -> Result<Arc<TableCatalog>> {
1294        let catalog_reader = self.env().catalog_reader().read_guard();
1295        let table = catalog_reader
1296            .get_schema_by_id(db_id, schema_id)?
1297            .get_created_table_by_name(table_name)
1298            .ok_or_else(|| {
1299                Error::new(
1300                    ErrorKind::InvalidInput,
1301                    format!("table \"{}\" does not exist", table_name),
1302                )
1303            })?;
1304
1305        self.check_privileges(&[ObjectCheckItem::new(
1306            table.owner(),
1307            AclMode::Select,
1308            table_name.to_owned(),
1309            table.id,
1310        )])?;
1311
1312        Ok(table.clone())
1313    }
1314
1315    pub fn get_secret_by_name(
1316        &self,
1317        schema_name: Option<String>,
1318        secret_name: &str,
1319    ) -> Result<Arc<SecretCatalog>> {
1320        let db_name = &self.database();
1321        let search_path = self.config().search_path();
1322        let user_name = &self.user_name();
1323
1324        let catalog_reader = self.env().catalog_reader().read_guard();
1325        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1326        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1327
1328        self.check_privileges(&[ObjectCheckItem::new(
1329            secret.owner(),
1330            AclMode::Usage,
1331            secret.name.clone(),
1332            secret.id,
1333        )])?;
1334
1335        Ok(secret.clone())
1336    }
1337
1338    pub async fn list_change_log_epochs(
1339        &self,
1340        table_id: TableId,
1341        min_epoch: u64,
1342        max_count: u32,
1343    ) -> Result<Vec<u64>> {
1344        let Some(max_epoch) = self
1345            .env
1346            .hummock_snapshot_manager()
1347            .acquire()
1348            .version()
1349            .state_table_info
1350            .info()
1351            .get(&table_id)
1352            .map(|s| s.committed_epoch)
1353        else {
1354            return Ok(vec![]);
1355        };
1356        let ret = self
1357            .env
1358            .meta_client_ref()
1359            .get_hummock_table_change_log(
1360                Some(min_epoch),
1361                Some(max_epoch),
1362                Some(iter::once(table_id).collect()),
1363                true,
1364                Some(max_count),
1365            )
1366            .await?;
1367        let Some(e) = ret.get(&table_id) else {
1368            return Ok(vec![]);
1369        };
1370        Ok(e.iter()
1371            .flat_map(|l| l.epochs())
1372            .filter(|e| *e >= min_epoch && *e <= max_epoch)
1373            .take(max_count as usize)
1374            .collect())
1375    }
1376
1377    pub fn clear_cancel_query_flag(&self) {
1378        let mut flag = self.current_query_cancel_flag.lock();
1379        *flag = None;
1380    }
1381
1382    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1383        let mut flag = self.current_query_cancel_flag.lock();
1384        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1385        *flag = Some(shutdown_tx);
1386        shutdown_rx
1387    }
1388
1389    pub fn cancel_current_query(&self) {
1390        let mut flag_guard = self.current_query_cancel_flag.lock();
1391        if let Some(sender) = flag_guard.take() {
1392            info!("Trying to cancel query in local mode.");
1393            // Current running query is in local mode
1394            sender.cancel();
1395            info!("Cancel query request sent.");
1396        } else {
1397            info!("Trying to cancel query in distributed mode.");
1398            self.env.query_manager().cancel_queries_in_session(self.id)
1399        }
1400    }
1401
1402    pub fn cancel_current_creating_job(&self) {
1403        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1404    }
1405
1406    /// This function only used for test now.
1407    /// Maybe we can remove it in the future.
1408    pub async fn run_statement(
1409        self: Arc<Self>,
1410        sql: Arc<str>,
1411        formats: Vec<Format>,
1412    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1413        // Parse sql.
1414        let mut stmts = Parser::parse_sql(&sql)?;
1415        if stmts.is_empty() {
1416            return Ok(PgResponse::empty_result(
1417                pgwire::pg_response::StatementType::EMPTY,
1418            ));
1419        }
1420        if stmts.len() > 1 {
1421            return Ok(
1422                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1423                    .notice("cannot insert multiple commands into statement")
1424                    .into(),
1425            );
1426        }
1427        let stmt = stmts.swap_remove(0);
1428        let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1429        Ok(rsp)
1430    }
1431
1432    pub fn notice_to_user(&self, str: impl Into<String>) {
1433        let notice = str.into();
1434        tracing::trace!(notice, "notice to user");
1435        self.notice_tx
1436            .send(notice)
1437            .expect("notice channel should not be closed");
1438    }
1439
1440    pub fn is_barrier_read(&self) -> bool {
1441        match self.config().visibility_mode() {
1442            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1443            VisibilityMode::All => true,
1444            VisibilityMode::Checkpoint => false,
1445        }
1446    }
1447
1448    pub fn statement_timeout(&self) -> Duration {
1449        if self.config().statement_timeout().millis() == 0 {
1450            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1451        } else {
1452            Duration::from_millis(self.config().statement_timeout().millis() as u64)
1453        }
1454    }
1455
1456    pub fn create_temporary_source(&self, source: SourceCatalog) {
1457        self.temporary_source_manager
1458            .lock()
1459            .create_source(source.name.clone(), source);
1460    }
1461
1462    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1463        self.temporary_source_manager
1464            .lock()
1465            .get_source(name)
1466            .cloned()
1467    }
1468
1469    pub fn drop_temporary_source(&self, name: &str) {
1470        self.temporary_source_manager.lock().drop_source(name);
1471    }
1472
1473    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1474        self.temporary_source_manager.lock().clone()
1475    }
1476
1477    pub fn create_staging_table(&self, table: TableCatalog) {
1478        self.staging_catalog_manager
1479            .lock()
1480            .create_table(table.name.clone(), table);
1481    }
1482
1483    pub fn drop_staging_table(&self, name: &str) {
1484        self.staging_catalog_manager.lock().drop_table(name);
1485    }
1486
1487    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1488        self.staging_catalog_manager.lock().clone()
1489    }
1490
1491    pub async fn check_cluster_limits(&self) -> Result<()> {
1492        if self.config().bypass_cluster_limits() {
1493            return Ok(());
1494        }
1495
1496        let gen_message = |ActorCountPerParallelism {
1497                               worker_id_to_actor_count,
1498                               hard_limit,
1499                               soft_limit,
1500                           }: ActorCountPerParallelism,
1501                           exceed_hard_limit: bool|
1502         -> String {
1503            let (limit_type, action) = if exceed_hard_limit {
1504                ("critical", "Scale the cluster immediately to proceed.")
1505            } else {
1506                (
1507                    "recommended",
1508                    "Consider scaling the cluster for optimal performance.",
1509                )
1510            };
1511            format!(
1512                r#"Actor count per parallelism exceeds the {limit_type} limit.
1513
1514Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1515
1516HINT:
1517- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1518- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1519  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1520- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1521
1522DETAILS:
1523- hard limit: {hard_limit}
1524- soft limit: {soft_limit}
1525- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1526            )
1527        };
1528
1529        let limits = self.env().meta_client().get_cluster_limits().await?;
1530        for limit in limits {
1531            match limit {
1532                cluster_limit::ClusterLimit::ActorCount(l) => {
1533                    if l.exceed_hard_limit() {
1534                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1535                            l, true,
1536                        ))));
1537                    } else if l.exceed_soft_limit() {
1538                        self.notice_to_user(gen_message(l, false));
1539                    }
1540                }
1541            }
1542        }
1543        Ok(())
1544    }
1545}
1546
1547pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1548    std::sync::OnceLock::new();
1549
1550pub struct SessionManagerImpl {
1551    env: FrontendEnv,
1552    _join_handles: Vec<JoinHandle<()>>,
1553    _shutdown_senders: Vec<Sender<()>>,
1554    number: AtomicI32,
1555}
1556
1557impl SessionManager for SessionManagerImpl {
1558    type Error = RwError;
1559    type Session = SessionImpl;
1560
1561    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1562        let dummy_addr = Address::Tcp(SocketAddr::new(
1563            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1564            5691, // port of meta
1565        ));
1566
1567        // Always use the built-in super user for dummy sessions.
1568        // This avoids permission checks tied to a specific user_id.
1569        self.connect_inner(
1570            database_id,
1571            risingwave_common::catalog::DEFAULT_SUPER_USER,
1572            Arc::new(dummy_addr),
1573        )
1574    }
1575
1576    fn connect(
1577        &self,
1578        database: &str,
1579        user_name: &str,
1580        peer_addr: AddressRef,
1581    ) -> Result<Arc<Self::Session>> {
1582        let catalog_reader = self.env.catalog_reader();
1583        let reader = catalog_reader.read_guard();
1584        let database_id = reader.get_database_by_name(database)?.id();
1585
1586        self.connect_inner(database_id, user_name, peer_addr)
1587    }
1588
1589    /// Used when cancel request happened.
1590    fn cancel_queries_in_session(&self, session_id: SessionId) {
1591        self.env.cancel_queries_in_session(session_id);
1592    }
1593
1594    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1595        self.env.cancel_creating_jobs_in_session(session_id);
1596    }
1597
1598    fn end_session(&self, session: &Self::Session) {
1599        self.delete_session(&session.session_id());
1600    }
1601
1602    async fn shutdown(&self) {
1603        // Clean up the session map.
1604        self.env.sessions_map().write().clear();
1605        // Unregister from the meta service.
1606        self.env.meta_client().try_unregister().await;
1607    }
1608}
1609
1610impl SessionManagerImpl {
1611    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1612        // TODO(shutdown): only save join handles that **need** to be shutdown
1613        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1614        Ok(Self {
1615            env,
1616            _join_handles: join_handles,
1617            _shutdown_senders: shutdown_senders,
1618            number: AtomicI32::new(0),
1619        })
1620    }
1621
1622    pub(crate) fn env(&self) -> &FrontendEnv {
1623        &self.env
1624    }
1625
1626    fn insert_session(&self, session: Arc<SessionImpl>) {
1627        let active_sessions = {
1628            let mut write_guard = self.env.sessions_map.write();
1629            write_guard.insert(session.id(), session);
1630            write_guard.len()
1631        };
1632        self.env
1633            .frontend_metrics
1634            .active_sessions
1635            .set(active_sessions as i64);
1636    }
1637
1638    fn delete_session(&self, session_id: &SessionId) {
1639        let active_sessions = {
1640            let mut write_guard = self.env.sessions_map.write();
1641            write_guard.remove(session_id);
1642            write_guard.len()
1643        };
1644        self.env
1645            .frontend_metrics
1646            .active_sessions
1647            .set(active_sessions as i64);
1648    }
1649
1650    fn connect_inner(
1651        &self,
1652        database_id: DatabaseId,
1653        user_name: &str,
1654        peer_addr: AddressRef,
1655    ) -> Result<Arc<SessionImpl>> {
1656        let catalog_reader = self.env.catalog_reader();
1657        let reader = catalog_reader.read_guard();
1658        let (database_name, database_owner) = {
1659            let db = reader.get_database_by_id(database_id)?;
1660            (db.name(), db.owner())
1661        };
1662
1663        let user_reader = self.env.user_info_reader();
1664        let reader = user_reader.read_guard();
1665        if let Some(user) = reader.get_user_by_name(user_name) {
1666            if !user.can_login {
1667                bail_permission_denied!("User {} is not allowed to login", user_name);
1668            }
1669            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1670            if !user.is_super && database_owner != user.id && !has_privilege {
1671                bail_permission_denied!("User does not have CONNECT privilege.");
1672            }
1673
1674            // Check HBA configuration for LDAP authentication
1675            let (connection_type, client_addr) = match peer_addr.as_ref() {
1676                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1677                Address::Unix(_) => (ConnectionType::Local, None),
1678            };
1679            tracing::debug!(
1680                "receive connection: type={:?}, client_addr={:?}",
1681                connection_type,
1682                client_addr
1683            );
1684
1685            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1686                &connection_type,
1687                database_name,
1688                user_name,
1689                client_addr,
1690            );
1691
1692            let Some(hba_entry_opt) = hba_entry_opt else {
1693                bail_permission_denied!(
1694                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1695                );
1696            };
1697
1698            // Determine the user authenticator based on the user's auth info.
1699            let authenticator_by_info = || -> Result<UserAuthenticator> {
1700                let authenticator = match &user.auth_info {
1701                    None => UserAuthenticator::None,
1702                    Some(auth_info) => match auth_info.encryption_type() {
1703                        EncryptionType::Plaintext => {
1704                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1705                        }
1706                        EncryptionType::Md5 => {
1707                            let mut salt = [0; 4];
1708                            let mut rng = rand::rng();
1709                            rng.fill_bytes(&mut salt);
1710                            UserAuthenticator::Md5WithSalt {
1711                                encrypted_password: md5_hash_with_salt(
1712                                    &auth_info.encrypted_value,
1713                                    &salt,
1714                                ),
1715                                salt,
1716                            }
1717                        }
1718                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1719                            metadata: auth_info.metadata.clone(),
1720                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1721                        },
1722                        _ => {
1723                            bail_protocol_error!(
1724                                "Unsupported auth type: {}",
1725                                auth_info.encryption_type().as_str_name()
1726                            );
1727                        }
1728                    },
1729                };
1730                Ok(authenticator)
1731            };
1732
1733            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1734                (AuthMethod::Trust, _) => UserAuthenticator::None,
1735                // For backward compatibility, we allow password auth method to work with any auth info.
1736                (AuthMethod::Password, _) => authenticator_by_info()?,
1737                (AuthMethod::Md5, Some(auth_info))
1738                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1739                {
1740                    authenticator_by_info()?
1741                }
1742                (AuthMethod::OAuth, Some(auth_info))
1743                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1744                {
1745                    authenticator_by_info()?
1746                }
1747                (AuthMethod::Ldap, _) => {
1748                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1749                }
1750                _ => {
1751                    bail_permission_denied!(
1752                        "password authentication failed for user \"{user_name}\""
1753                    );
1754                }
1755            };
1756
1757            // Assign a session id and insert into sessions map (for cancel request).
1758            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1759            // Use a trivial strategy: process_id and secret_key are equal.
1760            let id = (secret_key, secret_key);
1761            // Read session params snapshot from frontend env.
1762            let session_config = self.env.session_params_snapshot();
1763
1764            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1765                self.env.clone(),
1766                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1767                user_authenticator,
1768                id,
1769                peer_addr,
1770                session_config,
1771            )
1772            .into();
1773            self.insert_session(session_impl.clone());
1774
1775            Ok(session_impl)
1776        } else {
1777            bail_catalog_error!("Role {} does not exist", user_name);
1778        }
1779    }
1780}
1781
1782impl Session for SessionImpl {
1783    type Error = RwError;
1784    type Portal = Portal;
1785    type PreparedStatement = PrepareStatement;
1786    type ValuesStream = PgResponseStream;
1787
1788    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1789    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1790    async fn run_one_query(
1791        self: Arc<Self>,
1792        stmt: Statement,
1793        format: Format,
1794    ) -> Result<PgResponse<PgResponseStream>> {
1795        let string = stmt.to_string();
1796        let sql_str = string.as_str();
1797        let sql: Arc<str> = Arc::from(sql_str);
1798        // The handle can be slow. Release potential large String early.
1799        drop(string);
1800        let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1801        Ok(rsp)
1802    }
1803
1804    fn user_authenticator(&self) -> &UserAuthenticator {
1805        &self.user_authenticator
1806    }
1807
1808    fn id(&self) -> SessionId {
1809        self.id
1810    }
1811
1812    async fn parse(
1813        self: Arc<Self>,
1814        statement: Option<Statement>,
1815        params_types: Vec<Option<DataType>>,
1816    ) -> Result<PrepareStatement> {
1817        Ok(if let Some(statement) = statement {
1818            handle_parse(self, statement, params_types).await?
1819        } else {
1820            PrepareStatement::Empty
1821        })
1822    }
1823
1824    fn bind(
1825        self: Arc<Self>,
1826        prepare_statement: PrepareStatement,
1827        params: Vec<Option<Bytes>>,
1828        param_formats: Vec<Format>,
1829        result_formats: Vec<Format>,
1830    ) -> Result<Portal> {
1831        handle_bind(prepare_statement, params, param_formats, result_formats)
1832    }
1833
1834    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1835        let rsp = Box::pin(handle_execute(self, portal)).await?;
1836        Ok(rsp)
1837    }
1838
1839    fn describe_statement(
1840        self: Arc<Self>,
1841        prepare_statement: PrepareStatement,
1842    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1843        Ok(match prepare_statement {
1844            PrepareStatement::Empty => (vec![], vec![]),
1845            PrepareStatement::Prepared(prepare_statement) => (
1846                prepare_statement.bound_result.param_types,
1847                infer(
1848                    Some(prepare_statement.bound_result.bound),
1849                    prepare_statement.statement,
1850                )?,
1851            ),
1852            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1853        })
1854    }
1855
1856    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1857        match portal {
1858            Portal::Empty => Ok(vec![]),
1859            Portal::Portal(portal) => {
1860                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1861                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1862                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1863                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1864                    if f == Format::Binary {
1865                        c.set_to_binary()
1866                    }
1867                });
1868                Ok(columns)
1869            }
1870            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1871        }
1872    }
1873
1874    fn get_config(&self, key: &str) -> Result<String> {
1875        self.config().get(key).map_err(Into::into)
1876    }
1877
1878    fn set_config(&self, key: &str, value: String) -> Result<String> {
1879        Self::set_config(self, key, value)
1880    }
1881
1882    async fn next_notice(self: &Arc<Self>) -> String {
1883        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1884            .await
1885            .expect("notice channel should not be closed")
1886    }
1887
1888    fn transaction_status(&self) -> TransactionStatus {
1889        match &*self.txn.lock() {
1890            transaction::State::Initial | transaction::State::Implicit(_) => {
1891                TransactionStatus::Idle
1892            }
1893            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1894            // TODO: failed transaction
1895        }
1896    }
1897
1898    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1899    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1900        let exec_context = Arc::new(ExecContext {
1901            running_sql: sql,
1902            last_instant: Instant::now(),
1903            last_idle_instant: self.last_idle_instant.clone(),
1904        });
1905        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1906        // unset idle state, since there is a sql running
1907        *self.last_idle_instant.lock() = None;
1908        ExecContextGuard::new(exec_context)
1909    }
1910
1911    /// Check whether idle transaction timeout.
1912    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1913    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1914        // In transaction.
1915        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1916            let idle_in_transaction_session_timeout =
1917                self.config().idle_in_transaction_session_timeout() as u128;
1918            // Idle transaction timeout has been enabled.
1919            if idle_in_transaction_session_timeout != 0 {
1920                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1921                let guard = self.exec_context.lock();
1922                // No running sql i.e. idle
1923                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1924                    // Idle timeout.
1925                    if let Some(elapse_since_last_idle_instant) =
1926                        self.elapse_since_last_idle_instant()
1927                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1928                    {
1929                        return Err(PsqlError::IdleInTxnTimeout);
1930                    }
1931                }
1932            }
1933        }
1934        Ok(())
1935    }
1936
1937    fn user(&self) -> String {
1938        self.user_name()
1939    }
1940}
1941
1942/// Returns row description of the statement
1943fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1944    match stmt {
1945        Statement::Query(_)
1946        | Statement::Insert { .. }
1947        | Statement::Delete { .. }
1948        | Statement::Update { .. }
1949        | Statement::FetchCursor { .. } => Ok(bound
1950            .unwrap()
1951            .output_fields()
1952            .iter()
1953            .map(to_pg_field)
1954            .collect()),
1955        Statement::ShowObjects {
1956            object: show_object,
1957            ..
1958        } => Ok(infer_show_object(&show_object)),
1959        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1960        Statement::ShowTransactionIsolationLevel => {
1961            let name = "transaction_isolation";
1962            Ok(infer_show_variable(name))
1963        }
1964        Statement::ShowVariable { variable } => {
1965            let name = &variable[0].real_value().to_lowercase();
1966            Ok(infer_show_variable(name))
1967        }
1968        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1969        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1970            "QUERY PLAN".to_owned(),
1971            DataType::Varchar.to_oid(),
1972            DataType::Varchar.type_len(),
1973        )]),
1974        _ => Ok(vec![]),
1975    }
1976}
1977
1978pub struct WorkerProcessId {
1979    pub worker_id: WorkerId,
1980    pub process_id: i32,
1981}
1982
1983impl WorkerProcessId {
1984    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1985        Self {
1986            worker_id,
1987            process_id,
1988        }
1989    }
1990}
1991
1992impl Display for WorkerProcessId {
1993    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1994        write!(f, "{}:{}", self.worker_id, self.process_id)
1995    }
1996}
1997
1998impl TryFrom<String> for WorkerProcessId {
1999    type Error = String;
2000
2001    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2002        const INVALID: &str = "invalid WorkerProcessId";
2003        let splits: Vec<&str> = worker_process_id.split(":").collect();
2004        if splits.len() != 2 {
2005            return Err(INVALID.to_owned());
2006        }
2007        let Ok(worker_id) = splits[0].parse::<u32>() else {
2008            return Err(INVALID.to_owned());
2009        };
2010        let Ok(process_id) = splits[1].parse::<i32>() else {
2011            return Err(INVALID.to_owned());
2012        };
2013        Ok(WorkerProcessId::new(worker_id.into(), process_id))
2014    }
2015}
2016
2017pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2018    let guard = sessions_map.read();
2019    if let Some(session) = guard.get(&session_id) {
2020        session.cancel_current_query();
2021        true
2022    } else {
2023        info!("Current session finished, ignoring cancel query request");
2024        false
2025    }
2026}
2027
2028pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2029    let guard = sessions_map.read();
2030    if let Some(session) = guard.get(&session_id) {
2031        session.cancel_current_creating_job();
2032        true
2033    } else {
2034        info!("Current session finished, ignoring cancel creating request");
2035        false
2036    }
2037}