risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36    UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45    WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61    LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::configured_monitor_service_server;
79use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
80use risingwave_pb::health::health_server::HealthServer;
81use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
82use risingwave_pb::user::auth_info::EncryptionType;
83use risingwave_rpc_client::{
84    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
85    MonitorClientPool, MonitorClientPoolRef,
86};
87use risingwave_sqlparser::ast::{ObjectName, Statement};
88use risingwave_sqlparser::parser::Parser;
89use thiserror::Error;
90use thiserror_ext::AsReport;
91use tokio::runtime::Builder;
92use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
93use tokio::sync::oneshot::Sender;
94use tokio::sync::watch;
95use tokio::task::JoinHandle;
96use tracing::{error, info};
97
98use self::cursor_manager::CursorManager;
99use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
100use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
101use crate::catalog::connection_catalog::ConnectionCatalog;
102use crate::catalog::root_catalog::{Catalog, SchemaPath};
103use crate::catalog::secret_catalog::SecretCatalog;
104use crate::catalog::source_catalog::SourceCatalog;
105use crate::catalog::subscription_catalog::SubscriptionCatalog;
106use crate::catalog::{
107    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
108    check_schema_writable,
109};
110use crate::error::{
111    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
112};
113use crate::handler::describe::infer_describe;
114use crate::handler::extended_handle::{
115    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
116};
117use crate::handler::privilege::ObjectCheckItem;
118use crate::handler::show::{infer_show_create_object, infer_show_object};
119use crate::handler::util::to_pg_field;
120use crate::handler::variable::infer_show_variable;
121use crate::handler::{RwPgResponse, handle};
122use crate::health_service::HealthServiceImpl;
123use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
124use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
125use crate::observer::FrontendObserverNode;
126use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
127use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
128use crate::scheduler::{
129    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
130    HummockSnapshotManagerRef, QueryManager,
131};
132use crate::telemetry::FrontendTelemetryCreator;
133use crate::user::UserId;
134use crate::user::user_authentication::md5_hash_with_salt;
135use crate::user::user_manager::UserInfoManager;
136use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
137use crate::{FrontendOpts, PgResponseStream, TableCatalog};
138
139pub(crate) mod current;
140pub(crate) mod cursor_manager;
141pub(crate) mod transaction;
142
143/// The global environment for the frontend server.
144#[derive(Clone)]
145pub(crate) struct FrontendEnv {
146    // Different session may access catalog at the same time and catalog is protected by a
147    // RwLock.
148    meta_client: Arc<dyn FrontendMetaClient>,
149    catalog_writer: Arc<dyn CatalogWriter>,
150    catalog_reader: CatalogReader,
151    user_info_writer: Arc<dyn UserInfoWriter>,
152    user_info_reader: UserInfoReader,
153    worker_node_manager: WorkerNodeManagerRef,
154    query_manager: QueryManager,
155    hummock_snapshot_manager: HummockSnapshotManagerRef,
156    system_params_manager: LocalSystemParamsManagerRef,
157    session_params: Arc<RwLock<SessionConfig>>,
158
159    server_addr: HostAddr,
160    client_pool: ComputeClientPoolRef,
161    frontend_client_pool: FrontendClientPoolRef,
162    monitor_client_pool: MonitorClientPoolRef,
163
164    /// Each session is identified by (`process_id`,
165    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
166    /// running queries.
167    sessions_map: SessionMapRef,
168
169    pub frontend_metrics: Arc<FrontendMetrics>,
170
171    pub cursor_metrics: Arc<CursorMetrics>,
172
173    source_metrics: Arc<SourceMetrics>,
174
175    /// Batch spill metrics
176    spill_metrics: Arc<BatchSpillMetrics>,
177
178    batch_config: BatchConfig,
179    frontend_config: FrontendConfig,
180    #[expect(dead_code)]
181    meta_config: MetaConfig,
182    streaming_config: StreamingConfig,
183    udf_config: UdfConfig,
184
185    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
186    /// received.
187    creating_streaming_job_tracker: StreamingJobTrackerRef,
188
189    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
190    /// root stage in mpp mode.
191    compute_runtime: Arc<BackgroundShutdownRuntime>,
192
193    /// Memory context used for batch executors in frontend.
194    mem_context: MemoryContext,
195
196    #[cfg(feature = "datafusion")]
197    /// Shared budget context for **DataFusion** spillable operators.
198    /// Created by [`crate::datafusion::execute::memory_ctx::create_df_spillable_budget_ctx`];
199    /// caps total spillable usage so unspillable operators always have headroom.
200    /// Not used by the RisingWave batch engine.
201    df_spillable_budget_ctx: MemoryContext,
202
203    /// address of the serverless backfill controller.
204    serverless_backfill_controller_addr: String,
205
206    /// Prometheus client for querying metrics.
207    prometheus_client: Option<PrometheusClient>,
208
209    /// The additional selector used when querying Prometheus.
210    prometheus_selector: String,
211}
212
213/// Session map identified by `(process_id, secret_key)`
214pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
215
216/// The proportion of frontend memory used for batch processing.
217const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
218
219impl FrontendEnv {
220    pub fn mock() -> Self {
221        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
222
223        let catalog = Arc::new(RwLock::new(Catalog::default()));
224        let meta_client = Arc::new(MockFrontendMetaClient {});
225        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
226        let catalog_writer = Arc::new(MockCatalogWriter::new(
227            catalog.clone(),
228            hummock_snapshot_manager.clone(),
229        ));
230        let catalog_reader = CatalogReader::new(catalog);
231        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
232        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
233        let user_info_reader = UserInfoReader::new(user_info_manager);
234        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
235        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
236        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
237        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
238        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
239        let query_manager = QueryManager::new(
240            worker_node_manager.clone(),
241            compute_client_pool,
242            catalog_reader.clone(),
243            Arc::new(DistributedQueryMetrics::for_test()),
244            None,
245            None,
246        );
247        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
248        let client_pool = Arc::new(ComputeClientPool::for_test());
249        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
250        let runtime = {
251            let mut builder = Builder::new_multi_thread();
252            if let Some(frontend_compute_runtime_worker_threads) =
253                load_config("", FrontendOpts::default())
254                    .batch
255                    .frontend_compute_runtime_worker_threads
256            {
257                builder.worker_threads(frontend_compute_runtime_worker_threads);
258            }
259            builder
260                .thread_name("rw-batch-local")
261                .enable_all()
262                .build()
263                .unwrap()
264        };
265        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
266        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
267        Self {
268            meta_client,
269            catalog_writer,
270            catalog_reader,
271            user_info_writer,
272            user_info_reader,
273            worker_node_manager,
274            query_manager,
275            hummock_snapshot_manager,
276            system_params_manager,
277            session_params: Default::default(),
278            server_addr,
279            client_pool,
280            frontend_client_pool,
281            monitor_client_pool,
282            sessions_map,
283            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
284            cursor_metrics: Arc::new(CursorMetrics::for_test()),
285            batch_config: BatchConfig::default(),
286            frontend_config: FrontendConfig::default(),
287            meta_config: MetaConfig::default(),
288            streaming_config: StreamingConfig::default(),
289            udf_config: UdfConfig::default(),
290            source_metrics: Arc::new(SourceMetrics::default()),
291            spill_metrics: BatchSpillMetrics::for_test(),
292            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
293            compute_runtime,
294            mem_context: MemoryContext::none(),
295            #[cfg(feature = "datafusion")]
296            df_spillable_budget_ctx: MemoryContext::none(),
297            serverless_backfill_controller_addr: Default::default(),
298            prometheus_client: None,
299            prometheus_selector: String::new(),
300        }
301    }
302
303    pub(crate) fn set_frontend_config_for_test(&mut self, frontend_config: FrontendConfig) {
304        self.frontend_config = frontend_config;
305    }
306
307    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
308        let config = load_config(&opts.config_path, &opts);
309        info!("Starting frontend node");
310        info!("> config: {:?}", config);
311        info!(
312            "> debug assertions: {}",
313            if cfg!(debug_assertions) { "on" } else { "off" }
314        );
315        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
316
317        let frontend_address: HostAddr = opts
318            .advertise_addr
319            .as_ref()
320            .unwrap_or_else(|| {
321                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
322                &opts.listen_addr
323            })
324            .parse()
325            .unwrap();
326        info!("advertise addr is {}", frontend_address);
327
328        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
329        let internal_rpc_host_addr = HostAddr {
330            // Use the host of advertise address for the frontend rpc address.
331            host: frontend_address.host.clone(),
332            port: rpc_addr.port,
333        };
334        // Register in meta by calling `AddWorkerNode` RPC.
335        let (meta_client, system_params_reader) = MetaClient::register_new(
336            opts.meta_addr,
337            WorkerType::Frontend,
338            &frontend_address,
339            AddWorkerNodeProperty {
340                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
341                ..Default::default()
342            },
343            Arc::new(config.meta.clone()),
344        )
345        .await;
346
347        let worker_id = meta_client.worker_id();
348        info!("Assigned worker node id {}", worker_id);
349
350        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
351            meta_client.clone(),
352            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
353        );
354        let mut join_handles = vec![heartbeat_join_handle];
355        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
356
357        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
358        let hummock_snapshot_manager =
359            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
360
361        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
362        let catalog = Arc::new(RwLock::new(Catalog::default()));
363        let catalog_writer = Arc::new(CatalogWriterImpl::new(
364            meta_client.clone(),
365            catalog_updated_rx.clone(),
366            hummock_snapshot_manager.clone(),
367        ));
368        let catalog_reader = CatalogReader::new(catalog.clone());
369
370        let worker_node_manager = Arc::new(WorkerNodeManager::new());
371
372        let compute_client_pool = Arc::new(ComputeClientPool::new(
373            config.batch_exchange_connection_pool_size(),
374            config.batch.developer.compute_client_config.clone(),
375        ));
376        let frontend_client_pool = Arc::new(FrontendClientPool::new(
377            1,
378            config.batch.developer.frontend_client_config.clone(),
379        ));
380        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
381        let query_manager = QueryManager::new(
382            worker_node_manager.clone(),
383            compute_client_pool.clone(),
384            catalog_reader.clone(),
385            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
386            config.batch.distributed_query_limit,
387            config.batch.max_batch_queries_per_frontend_node,
388        );
389
390        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
391        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
392        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
393            meta_client.clone(),
394            catalog_updated_rx,
395        ));
396
397        let system_params_manager =
398            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
399
400        LocalSecretManager::init(
401            opts.temp_secret_file_dir,
402            meta_client.cluster_id().to_owned(),
403            worker_id,
404        );
405
406        // This `session_params` should be initialized during the initial notification in `observer_manager`
407        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
408        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
409        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
410
411        let frontend_observer_node = FrontendObserverNode::new(
412            worker_node_manager.clone(),
413            catalog,
414            catalog_updated_tx,
415            user_info_manager,
416            hummock_snapshot_manager.clone(),
417            system_params_manager.clone(),
418            session_params.clone(),
419            compute_client_pool.clone(),
420        );
421        let observer_manager =
422            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
423                .await;
424        let observer_join_handle = observer_manager.start().await;
425        join_handles.push(observer_join_handle);
426
427        meta_client.activate(&frontend_address).await?;
428
429        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
430        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
431        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
432
433        if config.server.metrics_level > MetricLevel::Disabled {
434            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
435        }
436
437        let health_srv = HealthServiceImpl::new();
438        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
439        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
440        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
441
442        let telemetry_manager = TelemetryManager::new(
443            Arc::new(meta_client.clone()),
444            Arc::new(FrontendTelemetryCreator::new()),
445        );
446
447        // if the toml config file or env variable disables telemetry, do not watch system params
448        // change because if any of configs disable telemetry, we should never start it
449        if config.server.telemetry_enabled && telemetry_env_enabled() {
450            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
451            join_handles.push(join_handle);
452            shutdown_senders.push(shutdown_sender);
453        } else {
454            tracing::info!("Telemetry didn't start due to config");
455        }
456
457        tokio::spawn(async move {
458            tonic::transport::Server::builder()
459                .add_service(HealthServer::new(health_srv))
460                .add_service(FrontendServiceServer::new(frontend_srv))
461                .add_service(configured_monitor_service_server(
462                    MonitorServiceServer::new(monitor_srv),
463                ))
464                .serve(frontend_rpc_addr)
465                .await
466                .unwrap();
467        });
468        info!(
469            "Health Check RPC Listener is set up on {}",
470            opts.frontend_rpc_listener_addr.clone()
471        );
472
473        let creating_streaming_job_tracker =
474            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
475
476        let runtime = {
477            let mut builder = Builder::new_multi_thread();
478            if let Some(frontend_compute_runtime_worker_threads) =
479                config.batch.frontend_compute_runtime_worker_threads
480            {
481                builder.worker_threads(frontend_compute_runtime_worker_threads);
482            }
483            builder
484                .thread_name("rw-batch-local")
485                .enable_all()
486                .build()
487                .unwrap()
488        };
489        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
490
491        let sessions = sessions_map.clone();
492        // Idle transaction background monitor
493        let join_handle = tokio::spawn(async move {
494            let mut check_idle_txn_interval =
495                tokio::time::interval(core::time::Duration::from_secs(10));
496            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
497            check_idle_txn_interval.reset();
498            loop {
499                check_idle_txn_interval.tick().await;
500                sessions.read().values().for_each(|session| {
501                    let _ = session.check_idle_in_transaction_timeout();
502                })
503            }
504        });
505        join_handles.push(join_handle);
506
507        // Clean up the spill directory.
508        #[cfg(not(madsim))]
509        if config.batch.enable_spill {
510            SpillOp::clean_spill_directory()
511                .await
512                .map_err(|err| anyhow!(err))?;
513        }
514
515        let total_memory_bytes = opts.frontend_total_memory_bytes;
516        let heap_profiler =
517            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
518        // Run a background heap profiler
519        heap_profiler.start();
520
521        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
522        let mem_context = MemoryContext::root(
523            frontend_metrics.batch_total_mem.clone(),
524            batch_memory_limit as u64,
525        );
526        #[cfg(feature = "datafusion")]
527        let df_spillable_budget_ctx =
528            crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
529
530        // Initialize Prometheus client if endpoint is provided
531        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
532            match PrometheusClient::try_from(endpoint.as_str()) {
533                Ok(client) => {
534                    info!("Prometheus client initialized with endpoint: {}", endpoint);
535                    Some(client)
536                }
537                Err(e) => {
538                    error!(
539                        error = %e.as_report(),
540                        "failed to initialize Prometheus client",
541                    );
542                    None
543                }
544            }
545        } else {
546            None
547        };
548
549        // Initialize Prometheus selector
550        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
551
552        info!(
553            "Frontend  total_memory: {} batch_memory: {}",
554            convert(total_memory_bytes as _),
555            convert(batch_memory_limit as _),
556        );
557
558        Ok((
559            Self {
560                catalog_reader,
561                catalog_writer,
562                user_info_reader,
563                user_info_writer,
564                worker_node_manager,
565                meta_client: frontend_meta_client,
566                query_manager,
567                hummock_snapshot_manager,
568                system_params_manager,
569                session_params,
570                server_addr: frontend_address,
571                client_pool: compute_client_pool,
572                frontend_client_pool,
573                monitor_client_pool,
574                frontend_metrics,
575                cursor_metrics,
576                spill_metrics,
577                sessions_map,
578                batch_config: config.batch,
579                frontend_config: config.frontend,
580                meta_config: config.meta,
581                streaming_config: config.streaming,
582                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
583                udf_config: config.udf,
584                source_metrics,
585                creating_streaming_job_tracker,
586                compute_runtime,
587                mem_context,
588                #[cfg(feature = "datafusion")]
589                df_spillable_budget_ctx,
590                prometheus_client,
591                prometheus_selector,
592            },
593            join_handles,
594            shutdown_senders,
595        ))
596    }
597
598    /// Get a reference to the frontend env's catalog writer.
599    ///
600    /// This method is intentionally private, and a write guard is required for the caller to
601    /// prove that the write operations are permitted in the current transaction.
602    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
603        &*self.catalog_writer
604    }
605
606    /// Get a reference to the frontend env's catalog reader.
607    pub fn catalog_reader(&self) -> &CatalogReader {
608        &self.catalog_reader
609    }
610
611    /// Get a reference to the frontend env's user info writer.
612    ///
613    /// This method is intentionally private, and a write guard is required for the caller to
614    /// prove that the write operations are permitted in the current transaction.
615    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
616        &*self.user_info_writer
617    }
618
619    /// Get a reference to the frontend env's user info reader.
620    pub fn user_info_reader(&self) -> &UserInfoReader {
621        &self.user_info_reader
622    }
623
624    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
625        self.worker_node_manager.clone()
626    }
627
628    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
629        &*self.meta_client
630    }
631
632    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
633        self.meta_client.clone()
634    }
635
636    pub fn query_manager(&self) -> &QueryManager {
637        &self.query_manager
638    }
639
640    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
641        &self.hummock_snapshot_manager
642    }
643
644    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
645        &self.system_params_manager
646    }
647
648    pub fn session_params_snapshot(&self) -> SessionConfig {
649        self.session_params.read_recursive().clone()
650    }
651
652    pub fn sbc_address(&self) -> &String {
653        &self.serverless_backfill_controller_addr
654    }
655
656    /// Get a reference to the Prometheus client if available.
657    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
658        self.prometheus_client.as_ref()
659    }
660
661    /// Get the Prometheus selector string.
662    pub fn prometheus_selector(&self) -> &str {
663        &self.prometheus_selector
664    }
665
666    pub fn server_address(&self) -> &HostAddr {
667        &self.server_addr
668    }
669
670    pub fn client_pool(&self) -> ComputeClientPoolRef {
671        self.client_pool.clone()
672    }
673
674    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
675        self.frontend_client_pool.clone()
676    }
677
678    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
679        self.monitor_client_pool.clone()
680    }
681
682    pub fn batch_config(&self) -> &BatchConfig {
683        &self.batch_config
684    }
685
686    pub fn frontend_config(&self) -> &FrontendConfig {
687        &self.frontend_config
688    }
689
690    pub fn streaming_config(&self) -> &StreamingConfig {
691        &self.streaming_config
692    }
693
694    pub fn udf_config(&self) -> &UdfConfig {
695        &self.udf_config
696    }
697
698    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
699        self.source_metrics.clone()
700    }
701
702    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
703        self.spill_metrics.clone()
704    }
705
706    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
707        &self.creating_streaming_job_tracker
708    }
709
710    pub fn sessions_map(&self) -> &SessionMapRef {
711        &self.sessions_map
712    }
713
714    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
715        self.compute_runtime.clone()
716    }
717
718    /// Cancel queries (i.e. batch queries) in session.
719    /// If the session exists return true, otherwise, return false.
720    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
721        cancel_queries_in_session(session_id, self.sessions_map.clone())
722    }
723
724    /// Cancel creating jobs (i.e. streaming queries) in session.
725    /// If the session exists return true, otherwise, return false.
726    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
727        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
728    }
729
730    pub fn mem_context(&self) -> MemoryContext {
731        self.mem_context.clone()
732    }
733
734    #[cfg(feature = "datafusion")]
735    pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
736        self.df_spillable_budget_ctx.clone()
737    }
738}
739
740#[derive(Clone)]
741pub struct AuthContext {
742    pub database: String,
743    pub user_name: String,
744    pub user_id: UserId,
745}
746
747impl AuthContext {
748    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
749        Self {
750            database,
751            user_name,
752            user_id,
753        }
754    }
755}
756pub struct SessionImpl {
757    env: FrontendEnv,
758    auth_context: Arc<RwLock<AuthContext>>,
759    /// Used for user authentication.
760    user_authenticator: UserAuthenticator,
761    /// Stores the value of configurations.
762    config_map: Arc<RwLock<SessionConfig>>,
763
764    /// Channel sender for frontend handler to send notices.
765    notice_tx: UnboundedSender<String>,
766    /// Channel receiver for pgwire to take notices and send to clients.
767    notice_rx: Mutex<UnboundedReceiver<String>>,
768
769    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
770    id: (i32, i32),
771
772    /// Client address
773    peer_addr: AddressRef,
774
775    /// Transaction state.
776    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
777    /// async functions, there should actually be no contention.
778    txn: Arc<Mutex<transaction::State>>,
779
780    /// Query cancel flag.
781    /// This flag is set only when current query is executed in local mode, and used to cancel
782    /// local query.
783    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
784
785    /// execution context represents the lifetime of a running SQL in the current session
786    exec_context: Mutex<Option<Weak<ExecContext>>>,
787
788    /// Last idle instant
789    last_idle_instant: Arc<Mutex<Option<Instant>>>,
790
791    cursor_manager: Arc<CursorManager>,
792
793    /// temporary sources for the current session
794    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
795
796    /// staging catalogs for the current session
797    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
798}
799
800/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
801/// Temporary sources are automatically dropped at the end of a session
802/// Temporary sources are expected to be selected by batch queries, not streaming queries.
803/// Temporary sources currently are only used by cloud portal to preview the data during table and
804/// source creation, so it is a internal feature and not exposed to users.
805/// The current PR supports temporary source with minimum effort,
806/// so we don't care about the database name and schema name, but only care about the source name.
807/// Temporary sources can only be shown via `show sources` command but not other system tables.
808#[derive(Default, Clone)]
809pub struct TemporarySourceManager {
810    sources: HashMap<String, SourceCatalog>,
811}
812
813impl TemporarySourceManager {
814    pub fn new() -> Self {
815        Self {
816            sources: HashMap::new(),
817        }
818    }
819
820    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
821        self.sources.insert(name, source);
822    }
823
824    pub fn drop_source(&mut self, name: &str) {
825        self.sources.remove(name);
826    }
827
828    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
829        self.sources.get(name)
830    }
831
832    pub fn keys(&self) -> Vec<String> {
833        self.sources.keys().cloned().collect()
834    }
835}
836
837/// Staging catalog manager is used to manage the tables creating in the current session.
838#[derive(Default, Clone)]
839pub struct StagingCatalogManager {
840    // staging tables creating in the current session.
841    tables: HashMap<String, TableCatalog>,
842}
843
844impl StagingCatalogManager {
845    pub fn new() -> Self {
846        Self {
847            tables: HashMap::new(),
848        }
849    }
850
851    pub fn create_table(&mut self, name: String, table: TableCatalog) {
852        self.tables.insert(name, table);
853    }
854
855    pub fn drop_table(&mut self, name: &str) {
856        self.tables.remove(name);
857    }
858
859    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
860        self.tables.get(name)
861    }
862}
863
864#[derive(Error, Debug)]
865pub enum CheckRelationError {
866    #[error("{0}")]
867    Resolve(#[from] ResolveQualifiedNameError),
868    #[error("{0}")]
869    Catalog(#[from] CatalogError),
870}
871
872impl From<CheckRelationError> for RwError {
873    fn from(e: CheckRelationError) -> Self {
874        match e {
875            CheckRelationError::Resolve(e) => e.into(),
876            CheckRelationError::Catalog(e) => e.into(),
877        }
878    }
879}
880
881impl SessionImpl {
882    pub(crate) fn new(
883        env: FrontendEnv,
884        auth_context: AuthContext,
885        user_authenticator: UserAuthenticator,
886        id: SessionId,
887        peer_addr: AddressRef,
888        session_config: SessionConfig,
889    ) -> Self {
890        let cursor_metrics = env.cursor_metrics.clone();
891        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
892
893        Self {
894            env,
895            auth_context: Arc::new(RwLock::new(auth_context)),
896            user_authenticator,
897            config_map: Arc::new(RwLock::new(session_config)),
898            id,
899            peer_addr,
900            txn: Default::default(),
901            current_query_cancel_flag: Mutex::new(None),
902            notice_tx,
903            notice_rx: Mutex::new(notice_rx),
904            exec_context: Mutex::new(None),
905            last_idle_instant: Default::default(),
906            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
907            temporary_source_manager: Default::default(),
908            staging_catalog_manager: Default::default(),
909        }
910    }
911
912    #[cfg(test)]
913    pub fn mock() -> Self {
914        let env = FrontendEnv::mock();
915        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
916
917        Self {
918            env: FrontendEnv::mock(),
919            auth_context: Arc::new(RwLock::new(AuthContext::new(
920                DEFAULT_DATABASE_NAME.to_owned(),
921                DEFAULT_SUPER_USER.to_owned(),
922                DEFAULT_SUPER_USER_ID,
923            ))),
924            user_authenticator: UserAuthenticator::None,
925            config_map: Default::default(),
926            // Mock session use non-sense id.
927            id: (0, 0),
928            txn: Default::default(),
929            current_query_cancel_flag: Mutex::new(None),
930            notice_tx,
931            notice_rx: Mutex::new(notice_rx),
932            exec_context: Mutex::new(None),
933            peer_addr: Address::Tcp(SocketAddr::new(
934                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
935                8080,
936            ))
937            .into(),
938            last_idle_instant: Default::default(),
939            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
940            temporary_source_manager: Default::default(),
941            staging_catalog_manager: Default::default(),
942        }
943    }
944
945    pub(crate) fn env(&self) -> &FrontendEnv {
946        &self.env
947    }
948
949    pub fn auth_context(&self) -> Arc<AuthContext> {
950        let ctx = self.auth_context.read();
951        Arc::new(ctx.clone())
952    }
953
954    pub fn database(&self) -> String {
955        self.auth_context.read().database.clone()
956    }
957
958    pub fn database_id(&self) -> DatabaseId {
959        let db_name = self.database();
960        self.env
961            .catalog_reader()
962            .read_guard()
963            .get_database_by_name(&db_name)
964            .map(|db| db.id())
965            .expect("session database not found")
966    }
967
968    pub fn user_name(&self) -> String {
969        self.auth_context.read().user_name.clone()
970    }
971
972    pub fn user_id(&self) -> UserId {
973        self.auth_context.read().user_id
974    }
975
976    pub fn update_database(&self, database: String) {
977        self.auth_context.write().database = database;
978    }
979
980    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
981        Arc::clone(&self.config_map)
982    }
983
984    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
985        self.config_map.read()
986    }
987
988    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
989        self.config_map
990            .write()
991            .set(key, value, &mut ())
992            .map_err(Into::into)
993    }
994
995    pub fn reset_config(&self, key: &str) -> Result<String> {
996        self.config_map
997            .write()
998            .reset(key, &mut ())
999            .map_err(Into::into)
1000    }
1001
1002    pub fn set_config_report(
1003        &self,
1004        key: &str,
1005        value: Option<String>,
1006        mut reporter: impl ConfigReporter,
1007    ) -> Result<String> {
1008        if let Some(value) = value {
1009            self.config_map
1010                .write()
1011                .set(key, value, &mut reporter)
1012                .map_err(Into::into)
1013        } else {
1014            self.config_map
1015                .write()
1016                .reset(key, &mut reporter)
1017                .map_err(Into::into)
1018        }
1019    }
1020
1021    pub fn session_id(&self) -> SessionId {
1022        self.id
1023    }
1024
1025    pub fn running_sql(&self) -> Option<Arc<str>> {
1026        self.exec_context
1027            .lock()
1028            .as_ref()
1029            .and_then(|weak| weak.upgrade())
1030            .map(|context| context.running_sql.clone())
1031    }
1032
1033    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1034        self.cursor_manager.clone()
1035    }
1036
1037    pub fn peer_addr(&self) -> &Address {
1038        &self.peer_addr
1039    }
1040
1041    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1042        self.exec_context
1043            .lock()
1044            .as_ref()
1045            .and_then(|weak| weak.upgrade())
1046            .map(|context| context.last_instant.elapsed().as_millis())
1047    }
1048
1049    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1050        self.last_idle_instant
1051            .lock()
1052            .as_ref()
1053            .map(|x| x.elapsed().as_millis())
1054    }
1055
1056    pub fn check_relation_name_duplicated(
1057        &self,
1058        name: ObjectName,
1059        stmt_type: StatementType,
1060        if_not_exists: bool,
1061    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1062        let db_name = &self.database();
1063        let catalog_reader = self.env().catalog_reader().read_guard();
1064        let (schema_name, relation_name) = {
1065            let (schema_name, relation_name) =
1066                Binder::resolve_schema_qualified_name(db_name, &name)?;
1067            let search_path = self.config().search_path();
1068            let user_name = &self.user_name();
1069            let schema_name = match schema_name {
1070                Some(schema_name) => schema_name,
1071                None => catalog_reader
1072                    .first_valid_schema(db_name, &search_path, user_name)?
1073                    .name(),
1074            };
1075            (schema_name, relation_name)
1076        };
1077        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1078            Err(e) if if_not_exists => {
1079                if let CatalogErrorInner::Duplicated {
1080                    name,
1081                    under_creation,
1082                    ..
1083                } = e.inner()
1084                {
1085                    // If relation is created, return directly.
1086                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1087                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1088                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1089                    if !*under_creation {
1090                        Ok(Either::Right(
1091                            PgResponse::builder(stmt_type)
1092                                .notice(format!("relation \"{}\" already exists, skipping", name))
1093                                .into(),
1094                        ))
1095                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1096                        // For now, when a Subscription is creating, we return directly with an additional message.
1097                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1098                        Ok(Either::Right(
1099                            PgResponse::builder(stmt_type)
1100                                .notice(format!(
1101                                    "relation \"{}\" already exists but still creating, skipping",
1102                                    name
1103                                ))
1104                                .into(),
1105                        ))
1106                    } else {
1107                        Ok(Either::Left(()))
1108                    }
1109                } else {
1110                    Err(e.into())
1111                }
1112            }
1113            Err(e) => Err(e.into()),
1114            Ok(_) => Ok(Either::Left(())),
1115        }
1116    }
1117
1118    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1119        let db_name = &self.database();
1120        let catalog_reader = self.env().catalog_reader().read_guard();
1121        let (schema_name, secret_name) = {
1122            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1123            let search_path = self.config().search_path();
1124            let user_name = &self.user_name();
1125            let schema_name = match schema_name {
1126                Some(schema_name) => schema_name,
1127                None => catalog_reader
1128                    .first_valid_schema(db_name, &search_path, user_name)?
1129                    .name(),
1130            };
1131            (schema_name, secret_name)
1132        };
1133        catalog_reader
1134            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1135            .map_err(RwError::from)
1136    }
1137
1138    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1139        let db_name = &self.database();
1140        let catalog_reader = self.env().catalog_reader().read_guard();
1141        let (schema_name, connection_name) = {
1142            let (schema_name, connection_name) =
1143                Binder::resolve_schema_qualified_name(db_name, &name)?;
1144            let search_path = self.config().search_path();
1145            let user_name = &self.user_name();
1146            let schema_name = match schema_name {
1147                Some(schema_name) => schema_name,
1148                None => catalog_reader
1149                    .first_valid_schema(db_name, &search_path, user_name)?
1150                    .name(),
1151            };
1152            (schema_name, connection_name)
1153        };
1154        catalog_reader
1155            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1156            .map_err(RwError::from)
1157    }
1158
1159    pub fn check_function_name_duplicated(
1160        &self,
1161        stmt_type: StatementType,
1162        name: ObjectName,
1163        arg_types: &[DataType],
1164        if_not_exists: bool,
1165    ) -> Result<Either<(), RwPgResponse>> {
1166        let db_name = &self.database();
1167        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1168        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1169
1170        let catalog_reader = self.env().catalog_reader().read_guard();
1171        if catalog_reader
1172            .get_schema_by_id(database_id, schema_id)?
1173            .get_function_by_name_args(&function_name, arg_types)
1174            .is_some()
1175        {
1176            let full_name = format!(
1177                "{function_name}({})",
1178                arg_types.iter().map(|t| t.to_string()).join(",")
1179            );
1180            if if_not_exists {
1181                Ok(Either::Right(
1182                    PgResponse::builder(stmt_type)
1183                        .notice(format!(
1184                            "function \"{}\" already exists, skipping",
1185                            full_name
1186                        ))
1187                        .into(),
1188                ))
1189            } else {
1190                Err(CatalogError::duplicated("function", full_name).into())
1191            }
1192        } else {
1193            Ok(Either::Left(()))
1194        }
1195    }
1196
1197    /// Also check if the user has the privilege to create in the schema.
1198    pub fn get_database_and_schema_id_for_create(
1199        &self,
1200        schema_name: Option<String>,
1201    ) -> Result<(DatabaseId, SchemaId)> {
1202        let db_name = &self.database();
1203
1204        let search_path = self.config().search_path();
1205        let user_name = &self.user_name();
1206
1207        let catalog_reader = self.env().catalog_reader().read_guard();
1208        let schema = match schema_name {
1209            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1210            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1211        };
1212        let schema_name = schema.name();
1213
1214        check_schema_writable(&schema_name)?;
1215        self.check_privileges(&[ObjectCheckItem::new(
1216            schema.owner(),
1217            AclMode::Create,
1218            schema_name,
1219            schema.id(),
1220        )])?;
1221
1222        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1223        Ok((db_id, schema.id()))
1224    }
1225
1226    pub fn get_connection_by_name(
1227        &self,
1228        schema_name: Option<String>,
1229        connection_name: &str,
1230    ) -> Result<Arc<ConnectionCatalog>> {
1231        let db_name = &self.database();
1232        let search_path = self.config().search_path();
1233        let user_name = &self.user_name();
1234
1235        let catalog_reader = self.env().catalog_reader().read_guard();
1236        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1237        let (connection, _) =
1238            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1239
1240        self.check_privileges(&[ObjectCheckItem::new(
1241            connection.owner(),
1242            AclMode::Usage,
1243            connection.name.clone(),
1244            connection.id,
1245        )])?;
1246
1247        Ok(connection.clone())
1248    }
1249
1250    pub fn get_subscription_by_schema_id_name(
1251        &self,
1252        schema_id: SchemaId,
1253        subscription_name: &str,
1254    ) -> Result<Arc<SubscriptionCatalog>> {
1255        let db_name = &self.database();
1256
1257        let catalog_reader = self.env().catalog_reader().read_guard();
1258        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1259        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1260        let subscription = schema
1261            .get_subscription_by_name(subscription_name)
1262            .ok_or_else(|| {
1263                RwError::from(ErrorCode::ItemNotFound(format!(
1264                    "subscription {} not found",
1265                    subscription_name
1266                )))
1267            })?;
1268        Ok(subscription.clone())
1269    }
1270
1271    pub fn get_subscription_by_name(
1272        &self,
1273        schema_name: Option<String>,
1274        subscription_name: &str,
1275    ) -> Result<Arc<SubscriptionCatalog>> {
1276        let db_name = &self.database();
1277        let search_path = self.config().search_path();
1278        let user_name = &self.user_name();
1279
1280        let catalog_reader = self.env().catalog_reader().read_guard();
1281        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1282        let (subscription, _) =
1283            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1284        Ok(subscription.clone())
1285    }
1286
1287    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1288        let catalog_reader = self.env().catalog_reader().read_guard();
1289        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1290    }
1291
1292    pub fn get_table_by_name(
1293        &self,
1294        table_name: &str,
1295        db_id: DatabaseId,
1296        schema_id: SchemaId,
1297    ) -> Result<Arc<TableCatalog>> {
1298        let catalog_reader = self.env().catalog_reader().read_guard();
1299        let table = catalog_reader
1300            .get_schema_by_id(db_id, schema_id)?
1301            .get_created_table_by_name(table_name)
1302            .ok_or_else(|| {
1303                Error::new(
1304                    ErrorKind::InvalidInput,
1305                    format!("table \"{}\" does not exist", table_name),
1306                )
1307            })?;
1308
1309        self.check_privileges(&[ObjectCheckItem::new(
1310            table.owner(),
1311            AclMode::Select,
1312            table_name.to_owned(),
1313            table.id,
1314        )])?;
1315
1316        Ok(table.clone())
1317    }
1318
1319    pub fn get_secret_by_name(
1320        &self,
1321        schema_name: Option<String>,
1322        secret_name: &str,
1323    ) -> Result<Arc<SecretCatalog>> {
1324        let db_name = &self.database();
1325        let search_path = self.config().search_path();
1326        let user_name = &self.user_name();
1327
1328        let catalog_reader = self.env().catalog_reader().read_guard();
1329        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1330        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1331
1332        self.check_privileges(&[ObjectCheckItem::new(
1333            secret.owner(),
1334            AclMode::Usage,
1335            secret.name.clone(),
1336            secret.id,
1337        )])?;
1338
1339        Ok(secret.clone())
1340    }
1341
1342    pub async fn list_change_log_epochs(
1343        &self,
1344        table_id: TableId,
1345        min_epoch: u64,
1346        max_count: u32,
1347    ) -> Result<Vec<u64>> {
1348        let Some(max_epoch) = self
1349            .env
1350            .hummock_snapshot_manager()
1351            .acquire()
1352            .version()
1353            .state_table_info
1354            .info()
1355            .get(&table_id)
1356            .map(|s| s.committed_epoch)
1357        else {
1358            return Ok(vec![]);
1359        };
1360        let ret = self
1361            .env
1362            .meta_client_ref()
1363            .get_hummock_table_change_log(
1364                Some(min_epoch),
1365                Some(max_epoch),
1366                Some(iter::once(table_id).collect()),
1367                true,
1368                Some(max_count),
1369            )
1370            .await?;
1371        let Some(e) = ret.get(&table_id) else {
1372            return Ok(vec![]);
1373        };
1374        Ok(e.iter()
1375            .flat_map(|l| l.epochs())
1376            .filter(|e| *e >= min_epoch && *e <= max_epoch)
1377            .take(max_count as usize)
1378            .collect())
1379    }
1380
1381    pub fn clear_cancel_query_flag(&self) {
1382        let mut flag = self.current_query_cancel_flag.lock();
1383        *flag = None;
1384    }
1385
1386    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1387        let mut flag = self.current_query_cancel_flag.lock();
1388        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1389        *flag = Some(shutdown_tx);
1390        shutdown_rx
1391    }
1392
1393    pub fn cancel_current_query(&self) {
1394        let mut flag_guard = self.current_query_cancel_flag.lock();
1395        if let Some(sender) = flag_guard.take() {
1396            info!("Trying to cancel query in local mode.");
1397            // Current running query is in local mode
1398            sender.cancel();
1399            info!("Cancel query request sent.");
1400        } else {
1401            info!("Trying to cancel query in distributed mode.");
1402            self.env.query_manager().cancel_queries_in_session(self.id)
1403        }
1404    }
1405
1406    pub fn cancel_current_creating_job(&self) {
1407        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1408    }
1409
1410    /// This function only used for test now.
1411    /// Maybe we can remove it in the future.
1412    pub async fn run_statement(
1413        self: Arc<Self>,
1414        sql: Arc<str>,
1415        formats: Vec<Format>,
1416    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1417        // Parse sql.
1418        let mut stmts = Parser::parse_sql(&sql)?;
1419        if stmts.is_empty() {
1420            return Ok(PgResponse::empty_result(
1421                pgwire::pg_response::StatementType::EMPTY,
1422            ));
1423        }
1424        if stmts.len() > 1 {
1425            return Ok(
1426                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1427                    .notice("cannot insert multiple commands into statement")
1428                    .into(),
1429            );
1430        }
1431        let stmt = stmts.swap_remove(0);
1432        let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1433        Ok(rsp)
1434    }
1435
1436    pub fn notice_to_user(&self, str: impl Into<String>) {
1437        let notice = str.into();
1438        tracing::trace!(notice, "notice to user");
1439        self.notice_tx
1440            .send(notice)
1441            .expect("notice channel should not be closed");
1442    }
1443
1444    pub fn is_barrier_read(&self) -> bool {
1445        match self.config().visibility_mode() {
1446            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1447            VisibilityMode::All => true,
1448            VisibilityMode::Checkpoint => false,
1449        }
1450    }
1451
1452    pub fn statement_timeout(&self) -> Duration {
1453        if self.config().statement_timeout().millis() == 0 {
1454            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1455        } else {
1456            Duration::from_millis(self.config().statement_timeout().millis() as u64)
1457        }
1458    }
1459
1460    pub fn create_temporary_source(&self, source: SourceCatalog) {
1461        self.temporary_source_manager
1462            .lock()
1463            .create_source(source.name.clone(), source);
1464    }
1465
1466    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1467        self.temporary_source_manager
1468            .lock()
1469            .get_source(name)
1470            .cloned()
1471    }
1472
1473    pub fn drop_temporary_source(&self, name: &str) {
1474        self.temporary_source_manager.lock().drop_source(name);
1475    }
1476
1477    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1478        self.temporary_source_manager.lock().clone()
1479    }
1480
1481    pub fn create_staging_table(&self, table: TableCatalog) {
1482        self.staging_catalog_manager
1483            .lock()
1484            .create_table(table.name.clone(), table);
1485    }
1486
1487    pub fn drop_staging_table(&self, name: &str) {
1488        self.staging_catalog_manager.lock().drop_table(name);
1489    }
1490
1491    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1492        self.staging_catalog_manager.lock().clone()
1493    }
1494
1495    pub async fn check_cluster_limits(&self) -> Result<()> {
1496        if self.config().bypass_cluster_limits() {
1497            return Ok(());
1498        }
1499
1500        let gen_message = |ActorCountPerParallelism {
1501                               worker_id_to_actor_count,
1502                               hard_limit,
1503                               soft_limit,
1504                           }: ActorCountPerParallelism,
1505                           exceed_hard_limit: bool|
1506         -> String {
1507            let (limit_type, action) = if exceed_hard_limit {
1508                ("critical", "Scale the cluster immediately to proceed.")
1509            } else {
1510                (
1511                    "recommended",
1512                    "Consider scaling the cluster for optimal performance.",
1513                )
1514            };
1515            format!(
1516                r#"Actor count per parallelism exceeds the {limit_type} limit.
1517
1518Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1519
1520HINT:
1521- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1522- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1523  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1524- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1525
1526DETAILS:
1527- hard limit: {hard_limit}
1528- soft limit: {soft_limit}
1529- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1530            )
1531        };
1532
1533        let limits = self.env().meta_client().get_cluster_limits().await?;
1534        for limit in limits {
1535            match limit {
1536                cluster_limit::ClusterLimit::ActorCount(l) => {
1537                    if l.exceed_hard_limit() {
1538                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1539                            l, true,
1540                        ))));
1541                    } else if l.exceed_soft_limit() {
1542                        self.notice_to_user(gen_message(l, false));
1543                    }
1544                }
1545            }
1546        }
1547        Ok(())
1548    }
1549}
1550
1551pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1552    std::sync::OnceLock::new();
1553
1554pub struct SessionManagerImpl {
1555    env: FrontendEnv,
1556    _join_handles: Vec<JoinHandle<()>>,
1557    _shutdown_senders: Vec<Sender<()>>,
1558    number: AtomicI32,
1559}
1560
1561impl SessionManager for SessionManagerImpl {
1562    type Error = RwError;
1563    type Session = SessionImpl;
1564
1565    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1566        let dummy_addr = Address::Tcp(SocketAddr::new(
1567            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1568            5691, // port of meta
1569        ));
1570
1571        // Always use the built-in super user for dummy sessions.
1572        // This avoids permission checks tied to a specific user_id.
1573        self.connect_inner(
1574            database_id,
1575            risingwave_common::catalog::DEFAULT_SUPER_USER,
1576            Arc::new(dummy_addr),
1577        )
1578    }
1579
1580    fn connect(
1581        &self,
1582        database: &str,
1583        user_name: &str,
1584        peer_addr: AddressRef,
1585    ) -> Result<Arc<Self::Session>> {
1586        let catalog_reader = self.env.catalog_reader();
1587        let reader = catalog_reader.read_guard();
1588        let database_id = reader.get_database_by_name(database)?.id();
1589
1590        self.connect_inner(database_id, user_name, peer_addr)
1591    }
1592
1593    /// Used when cancel request happened.
1594    fn cancel_queries_in_session(&self, session_id: SessionId) {
1595        self.env.cancel_queries_in_session(session_id);
1596    }
1597
1598    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1599        self.env.cancel_creating_jobs_in_session(session_id);
1600    }
1601
1602    fn end_session(&self, session: &Self::Session) {
1603        self.delete_session(&session.session_id());
1604    }
1605
1606    async fn shutdown(&self) {
1607        // Clean up the session map.
1608        self.env.sessions_map().write().clear();
1609        // Unregister from the meta service.
1610        self.env.meta_client().try_unregister().await;
1611    }
1612}
1613
1614impl SessionManagerImpl {
1615    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1616        // TODO(shutdown): only save join handles that **need** to be shutdown
1617        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1618        Ok(Self {
1619            env,
1620            _join_handles: join_handles,
1621            _shutdown_senders: shutdown_senders,
1622            number: AtomicI32::new(0),
1623        })
1624    }
1625
1626    pub(crate) fn env(&self) -> &FrontendEnv {
1627        &self.env
1628    }
1629
1630    fn insert_session(&self, session: Arc<SessionImpl>) {
1631        let active_sessions = {
1632            let mut write_guard = self.env.sessions_map.write();
1633            write_guard.insert(session.id(), session);
1634            write_guard.len()
1635        };
1636        self.env
1637            .frontend_metrics
1638            .active_sessions
1639            .set(active_sessions as i64);
1640    }
1641
1642    fn delete_session(&self, session_id: &SessionId) {
1643        let active_sessions = {
1644            let mut write_guard = self.env.sessions_map.write();
1645            write_guard.remove(session_id);
1646            write_guard.len()
1647        };
1648        self.env
1649            .frontend_metrics
1650            .active_sessions
1651            .set(active_sessions as i64);
1652    }
1653
1654    fn connect_inner(
1655        &self,
1656        database_id: DatabaseId,
1657        user_name: &str,
1658        peer_addr: AddressRef,
1659    ) -> Result<Arc<SessionImpl>> {
1660        let catalog_reader = self.env.catalog_reader();
1661        let reader = catalog_reader.read_guard();
1662        let (database_name, database_owner) = {
1663            let db = reader.get_database_by_id(database_id)?;
1664            (db.name(), db.owner())
1665        };
1666
1667        let user_reader = self.env.user_info_reader();
1668        let reader = user_reader.read_guard();
1669        if let Some(user) = reader.get_user_by_name(user_name) {
1670            if !user.can_login {
1671                bail_permission_denied!("User {} is not allowed to login", user_name);
1672            }
1673            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1674            if !user.is_super && database_owner != user.id && !has_privilege {
1675                bail_permission_denied!("User does not have CONNECT privilege.");
1676            }
1677
1678            // Check HBA configuration for LDAP authentication
1679            let (connection_type, client_addr) = match peer_addr.as_ref() {
1680                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1681                Address::Unix(_) => (ConnectionType::Local, None),
1682            };
1683            tracing::debug!(
1684                "receive connection: type={:?}, client_addr={:?}",
1685                connection_type,
1686                client_addr
1687            );
1688
1689            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1690                &connection_type,
1691                database_name,
1692                user_name,
1693                client_addr,
1694            );
1695
1696            let Some(hba_entry_opt) = hba_entry_opt else {
1697                bail_permission_denied!(
1698                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1699                );
1700            };
1701
1702            // Determine the user authenticator based on the user's auth info.
1703            let authenticator_by_info = || -> Result<UserAuthenticator> {
1704                let authenticator = match &user.auth_info {
1705                    None => UserAuthenticator::None,
1706                    Some(auth_info) => match auth_info.encryption_type() {
1707                        EncryptionType::Plaintext => {
1708                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1709                        }
1710                        EncryptionType::Md5 => {
1711                            let mut salt = [0; 4];
1712                            let mut rng = rand::rng();
1713                            rng.fill_bytes(&mut salt);
1714                            UserAuthenticator::Md5WithSalt {
1715                                encrypted_password: md5_hash_with_salt(
1716                                    &auth_info.encrypted_value,
1717                                    &salt,
1718                                ),
1719                                salt,
1720                            }
1721                        }
1722                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1723                            metadata: auth_info.metadata.clone(),
1724                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1725                        },
1726                        _ => {
1727                            bail_protocol_error!(
1728                                "Unsupported auth type: {}",
1729                                auth_info.encryption_type().as_str_name()
1730                            );
1731                        }
1732                    },
1733                };
1734                Ok(authenticator)
1735            };
1736
1737            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1738                (AuthMethod::Trust, _) => UserAuthenticator::None,
1739                // For backward compatibility, we allow password auth method to work with any auth info.
1740                (AuthMethod::Password, _) => authenticator_by_info()?,
1741                (AuthMethod::Md5, Some(auth_info))
1742                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1743                {
1744                    authenticator_by_info()?
1745                }
1746                (AuthMethod::OAuth, Some(auth_info))
1747                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1748                {
1749                    authenticator_by_info()?
1750                }
1751                (AuthMethod::Ldap, _) => {
1752                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1753                }
1754                _ => {
1755                    bail_permission_denied!(
1756                        "password authentication failed for user \"{user_name}\""
1757                    );
1758                }
1759            };
1760
1761            // Assign a session id and insert into sessions map (for cancel request).
1762            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1763            // Use a trivial strategy: process_id and secret_key are equal.
1764            let id = (secret_key, secret_key);
1765            // Read session params snapshot from frontend env.
1766            let session_config = self.env.session_params_snapshot();
1767
1768            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1769                self.env.clone(),
1770                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1771                user_authenticator,
1772                id,
1773                peer_addr,
1774                session_config,
1775            )
1776            .into();
1777            self.insert_session(session_impl.clone());
1778
1779            Ok(session_impl)
1780        } else {
1781            bail_catalog_error!("Role {} does not exist", user_name);
1782        }
1783    }
1784}
1785
1786impl Session for SessionImpl {
1787    type Error = RwError;
1788    type Portal = Portal;
1789    type PreparedStatement = PrepareStatement;
1790    type ValuesStream = PgResponseStream;
1791
1792    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1793    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1794    async fn run_one_query(
1795        self: Arc<Self>,
1796        stmt: Statement,
1797        format: Format,
1798    ) -> Result<PgResponse<PgResponseStream>> {
1799        let string = stmt.to_string();
1800        let sql_str = string.as_str();
1801        let sql: Arc<str> = Arc::from(sql_str);
1802        // The handle can be slow. Release potential large String early.
1803        drop(string);
1804        let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1805        Ok(rsp)
1806    }
1807
1808    fn user_authenticator(&self) -> &UserAuthenticator {
1809        &self.user_authenticator
1810    }
1811
1812    fn id(&self) -> SessionId {
1813        self.id
1814    }
1815
1816    async fn parse(
1817        self: Arc<Self>,
1818        statement: Option<Statement>,
1819        params_types: Vec<Option<DataType>>,
1820    ) -> Result<PrepareStatement> {
1821        Ok(if let Some(statement) = statement {
1822            handle_parse(self, statement, params_types).await?
1823        } else {
1824            PrepareStatement::Empty
1825        })
1826    }
1827
1828    fn bind(
1829        self: Arc<Self>,
1830        prepare_statement: PrepareStatement,
1831        params: Vec<Option<Bytes>>,
1832        param_formats: Vec<Format>,
1833        result_formats: Vec<Format>,
1834    ) -> Result<Portal> {
1835        handle_bind(prepare_statement, params, param_formats, result_formats)
1836    }
1837
1838    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1839        let rsp = Box::pin(handle_execute(self, portal)).await?;
1840        Ok(rsp)
1841    }
1842
1843    fn describe_statement(
1844        self: Arc<Self>,
1845        prepare_statement: PrepareStatement,
1846    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1847        Ok(match prepare_statement {
1848            PrepareStatement::Empty => (vec![], vec![]),
1849            PrepareStatement::Prepared(prepare_statement) => (
1850                prepare_statement.bound_result.param_types,
1851                infer(
1852                    Some(prepare_statement.bound_result.bound),
1853                    prepare_statement.statement,
1854                )?,
1855            ),
1856            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1857        })
1858    }
1859
1860    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1861        match portal {
1862            Portal::Empty => Ok(vec![]),
1863            Portal::Portal(portal) => {
1864                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1865                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1866                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1867                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1868                    if f == Format::Binary {
1869                        c.set_to_binary()
1870                    }
1871                });
1872                Ok(columns)
1873            }
1874            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1875        }
1876    }
1877
1878    fn get_config(&self, key: &str) -> Result<String> {
1879        self.config().get(key).map_err(Into::into)
1880    }
1881
1882    fn set_config(&self, key: &str, value: String) -> Result<String> {
1883        Self::set_config(self, key, value)
1884    }
1885
1886    async fn next_notice(self: &Arc<Self>) -> String {
1887        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1888            .await
1889            .expect("notice channel should not be closed")
1890    }
1891
1892    fn transaction_status(&self) -> TransactionStatus {
1893        match &*self.txn.lock() {
1894            transaction::State::Initial | transaction::State::Implicit(_) => {
1895                TransactionStatus::Idle
1896            }
1897            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1898            // TODO: failed transaction
1899        }
1900    }
1901
1902    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1903    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1904        let exec_context = Arc::new(ExecContext {
1905            running_sql: sql,
1906            last_instant: Instant::now(),
1907            last_idle_instant: self.last_idle_instant.clone(),
1908        });
1909        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1910        // unset idle state, since there is a sql running
1911        *self.last_idle_instant.lock() = None;
1912        ExecContextGuard::new(exec_context)
1913    }
1914
1915    /// Check whether idle transaction timeout.
1916    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1917    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1918        // In transaction.
1919        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1920            let idle_in_transaction_session_timeout =
1921                self.config().idle_in_transaction_session_timeout() as u128;
1922            // Idle transaction timeout has been enabled.
1923            if idle_in_transaction_session_timeout != 0 {
1924                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1925                let guard = self.exec_context.lock();
1926                // No running sql i.e. idle
1927                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1928                    // Idle timeout.
1929                    if let Some(elapse_since_last_idle_instant) =
1930                        self.elapse_since_last_idle_instant()
1931                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1932                    {
1933                        return Err(PsqlError::IdleInTxnTimeout);
1934                    }
1935                }
1936            }
1937        }
1938        Ok(())
1939    }
1940
1941    fn user(&self) -> String {
1942        self.user_name()
1943    }
1944}
1945
1946/// Returns row description of the statement
1947fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1948    match stmt {
1949        Statement::Query(_)
1950        | Statement::Insert { .. }
1951        | Statement::Delete { .. }
1952        | Statement::Update { .. }
1953        | Statement::FetchCursor { .. } => Ok(bound
1954            .unwrap()
1955            .output_fields()
1956            .iter()
1957            .map(to_pg_field)
1958            .collect()),
1959        Statement::ShowObjects {
1960            object: show_object,
1961            ..
1962        } => Ok(infer_show_object(&show_object)),
1963        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1964        Statement::ShowTransactionIsolationLevel => {
1965            let name = "transaction_isolation";
1966            Ok(infer_show_variable(name))
1967        }
1968        Statement::ShowVariable { variable } => {
1969            let name = &variable[0].real_value().to_lowercase();
1970            Ok(infer_show_variable(name))
1971        }
1972        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1973        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1974            "QUERY PLAN".to_owned(),
1975            DataType::Varchar.to_oid(),
1976            DataType::Varchar.type_len(),
1977        )]),
1978        _ => Ok(vec![]),
1979    }
1980}
1981
1982pub struct WorkerProcessId {
1983    pub worker_id: WorkerId,
1984    pub process_id: i32,
1985}
1986
1987impl WorkerProcessId {
1988    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1989        Self {
1990            worker_id,
1991            process_id,
1992        }
1993    }
1994}
1995
1996impl Display for WorkerProcessId {
1997    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1998        write!(f, "{}:{}", self.worker_id, self.process_id)
1999    }
2000}
2001
2002impl TryFrom<String> for WorkerProcessId {
2003    type Error = String;
2004
2005    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
2006        const INVALID: &str = "invalid WorkerProcessId";
2007        let splits: Vec<&str> = worker_process_id.split(":").collect();
2008        if splits.len() != 2 {
2009            return Err(INVALID.to_owned());
2010        }
2011        let Ok(worker_id) = splits[0].parse::<u32>() else {
2012            return Err(INVALID.to_owned());
2013        };
2014        let Ok(process_id) = splits[1].parse::<i32>() else {
2015            return Err(INVALID.to_owned());
2016        };
2017        Ok(WorkerProcessId::new(worker_id.into(), process_id))
2018    }
2019}
2020
2021pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2022    let guard = sessions_map.read();
2023    if let Some(session) = guard.get(&session_id) {
2024        session.cancel_current_query();
2025        true
2026    } else {
2027        info!("Current session finished, ignoring cancel query request");
2028        false
2029    }
2030}
2031
2032pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2033    let guard = sessions_map.read();
2034    if let Some(session) = guard.get(&session_id) {
2035        session.cancel_current_creating_job();
2036        true
2037    } else {
2038        info!("Current session finished, ignoring cancel creating request");
2039        false
2040    }
2041}