risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::iter;
19use std::net::{IpAddr, Ipv4Addr, SocketAddr};
20use std::sync::atomic::{AtomicI32, Ordering};
21use std::sync::{Arc, Weak};
22use std::time::{Duration, Instant};
23
24use anyhow::anyhow;
25use bytes::Bytes;
26use either::Either;
27use itertools::Itertools;
28use parking_lot::{Mutex, RwLock, RwLockReadGuard};
29use pgwire::error::{PsqlError, PsqlResult};
30use pgwire::net::{Address, AddressRef};
31use pgwire::pg_field_descriptor::PgFieldDescriptor;
32use pgwire::pg_message::TransactionStatus;
33use pgwire::pg_response::{PgResponse, StatementType};
34use pgwire::pg_server::{
35    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
36    UserAuthenticator,
37};
38use pgwire::types::{Format, FormatIterator};
39use prometheus_http_query::Client as PrometheusClient;
40use rand::RngCore;
41use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
42use risingwave_batch::spill::spill_op::SpillOp;
43use risingwave_batch::task::{ShutdownSender, ShutdownToken};
44use risingwave_batch::worker_manager::worker_node_manager::{
45    WorkerNodeManager, WorkerNodeManagerRef,
46};
47use risingwave_common::acl::AclMode;
48#[cfg(test)]
49use risingwave_common::catalog::{
50    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
51};
52use risingwave_common::config::{
53    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
54    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
55};
56use risingwave_common::id::WorkerId;
57use risingwave_common::memory::MemoryContext;
58use risingwave_common::secret::LocalSecretManager;
59use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
60use risingwave_common::system_param::local_manager::{
61    LocalSystemParamsManager, LocalSystemParamsManagerRef,
62};
63use risingwave_common::telemetry::manager::TelemetryManager;
64use risingwave_common::telemetry::telemetry_env_enabled;
65use risingwave_common::types::DataType;
66use risingwave_common::util::addr::HostAddr;
67use risingwave_common::util::cluster_limit;
68use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
69use risingwave_common::util::iter_util::ZipEqFast;
70use risingwave_common::util::pretty_bytes::convert;
71use risingwave_common::util::runtime::BackgroundShutdownRuntime;
72use risingwave_common::{GIT_SHA, RW_VERSION};
73use risingwave_common_heap_profiling::HeapProfiler;
74use risingwave_common_service::{MetricsManager, ObserverManager};
75use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
76use risingwave_pb::common::WorkerType;
77use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
78use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
79use risingwave_pb::health::health_server::HealthServer;
80use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
81use risingwave_pb::user::auth_info::EncryptionType;
82use risingwave_rpc_client::{
83    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
84    MonitorClientPool, MonitorClientPoolRef,
85};
86use risingwave_sqlparser::ast::{ObjectName, Statement};
87use risingwave_sqlparser::parser::Parser;
88use thiserror::Error;
89use thiserror_ext::AsReport;
90use tokio::runtime::Builder;
91use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
92use tokio::sync::oneshot::Sender;
93use tokio::sync::watch;
94use tokio::task::JoinHandle;
95use tracing::{error, info};
96
97use self::cursor_manager::CursorManager;
98use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
99use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
100use crate::catalog::connection_catalog::ConnectionCatalog;
101use crate::catalog::root_catalog::{Catalog, SchemaPath};
102use crate::catalog::secret_catalog::SecretCatalog;
103use crate::catalog::source_catalog::SourceCatalog;
104use crate::catalog::subscription_catalog::SubscriptionCatalog;
105use crate::catalog::{
106    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
107    check_schema_writable,
108};
109use crate::error::{
110    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
111};
112use crate::handler::describe::infer_describe;
113use crate::handler::extended_handle::{
114    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
115};
116use crate::handler::privilege::ObjectCheckItem;
117use crate::handler::show::{infer_show_create_object, infer_show_object};
118use crate::handler::util::to_pg_field;
119use crate::handler::variable::infer_show_variable;
120use crate::handler::{RwPgResponse, handle};
121use crate::health_service::HealthServiceImpl;
122use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
123use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
124use crate::observer::FrontendObserverNode;
125use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
126use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
127use crate::scheduler::{
128    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
129    HummockSnapshotManagerRef, QueryManager,
130};
131use crate::telemetry::FrontendTelemetryCreator;
132use crate::user::UserId;
133use crate::user::user_authentication::md5_hash_with_salt;
134use crate::user::user_manager::UserInfoManager;
135use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
136use crate::{FrontendOpts, PgResponseStream, TableCatalog};
137
138pub(crate) mod current;
139pub(crate) mod cursor_manager;
140pub(crate) mod transaction;
141
142/// The global environment for the frontend server.
143#[derive(Clone)]
144pub(crate) struct FrontendEnv {
145    // Different session may access catalog at the same time and catalog is protected by a
146    // RwLock.
147    meta_client: Arc<dyn FrontendMetaClient>,
148    catalog_writer: Arc<dyn CatalogWriter>,
149    catalog_reader: CatalogReader,
150    user_info_writer: Arc<dyn UserInfoWriter>,
151    user_info_reader: UserInfoReader,
152    worker_node_manager: WorkerNodeManagerRef,
153    query_manager: QueryManager,
154    hummock_snapshot_manager: HummockSnapshotManagerRef,
155    system_params_manager: LocalSystemParamsManagerRef,
156    session_params: Arc<RwLock<SessionConfig>>,
157
158    server_addr: HostAddr,
159    client_pool: ComputeClientPoolRef,
160    frontend_client_pool: FrontendClientPoolRef,
161    monitor_client_pool: MonitorClientPoolRef,
162
163    /// Each session is identified by (`process_id`,
164    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
165    /// running queries.
166    sessions_map: SessionMapRef,
167
168    pub frontend_metrics: Arc<FrontendMetrics>,
169
170    pub cursor_metrics: Arc<CursorMetrics>,
171
172    source_metrics: Arc<SourceMetrics>,
173
174    /// Batch spill metrics
175    spill_metrics: Arc<BatchSpillMetrics>,
176
177    batch_config: BatchConfig,
178    frontend_config: FrontendConfig,
179    #[expect(dead_code)]
180    meta_config: MetaConfig,
181    streaming_config: StreamingConfig,
182    udf_config: UdfConfig,
183
184    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
185    /// received.
186    creating_streaming_job_tracker: StreamingJobTrackerRef,
187
188    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
189    /// root stage in mpp mode.
190    compute_runtime: Arc<BackgroundShutdownRuntime>,
191
192    /// Memory context used for batch executors in frontend.
193    mem_context: MemoryContext,
194
195    #[cfg(feature = "datafusion")]
196    /// Shared budget context for **DataFusion** spillable operators.
197    /// Created by [`crate::datafusion::execute::memory_ctx::create_df_spillable_budget_ctx`];
198    /// caps total spillable usage so unspillable operators always have headroom.
199    /// Not used by the RisingWave batch engine.
200    df_spillable_budget_ctx: MemoryContext,
201
202    /// address of the serverless backfill controller.
203    serverless_backfill_controller_addr: String,
204
205    /// Prometheus client for querying metrics.
206    prometheus_client: Option<PrometheusClient>,
207
208    /// The additional selector used when querying Prometheus.
209    prometheus_selector: String,
210}
211
212/// Session map identified by `(process_id, secret_key)`
213pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
214
215/// The proportion of frontend memory used for batch processing.
216const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
217
218impl FrontendEnv {
219    pub fn mock() -> Self {
220        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
221
222        let catalog = Arc::new(RwLock::new(Catalog::default()));
223        let meta_client = Arc::new(MockFrontendMetaClient {});
224        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
225        let catalog_writer = Arc::new(MockCatalogWriter::new(
226            catalog.clone(),
227            hummock_snapshot_manager.clone(),
228        ));
229        let catalog_reader = CatalogReader::new(catalog);
230        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
231        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
232        let user_info_reader = UserInfoReader::new(user_info_manager);
233        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
234        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
235        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
236        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
237        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
238        let query_manager = QueryManager::new(
239            worker_node_manager.clone(),
240            compute_client_pool,
241            catalog_reader.clone(),
242            Arc::new(DistributedQueryMetrics::for_test()),
243            None,
244            None,
245        );
246        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
247        let client_pool = Arc::new(ComputeClientPool::for_test());
248        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
249        let runtime = {
250            let mut builder = Builder::new_multi_thread();
251            if let Some(frontend_compute_runtime_worker_threads) =
252                load_config("", FrontendOpts::default())
253                    .batch
254                    .frontend_compute_runtime_worker_threads
255            {
256                builder.worker_threads(frontend_compute_runtime_worker_threads);
257            }
258            builder
259                .thread_name("rw-batch-local")
260                .enable_all()
261                .build()
262                .unwrap()
263        };
264        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
265        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
266        Self {
267            meta_client,
268            catalog_writer,
269            catalog_reader,
270            user_info_writer,
271            user_info_reader,
272            worker_node_manager,
273            query_manager,
274            hummock_snapshot_manager,
275            system_params_manager,
276            session_params: Default::default(),
277            server_addr,
278            client_pool,
279            frontend_client_pool,
280            monitor_client_pool,
281            sessions_map,
282            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
283            cursor_metrics: Arc::new(CursorMetrics::for_test()),
284            batch_config: BatchConfig::default(),
285            frontend_config: FrontendConfig::default(),
286            meta_config: MetaConfig::default(),
287            streaming_config: StreamingConfig::default(),
288            udf_config: UdfConfig::default(),
289            source_metrics: Arc::new(SourceMetrics::default()),
290            spill_metrics: BatchSpillMetrics::for_test(),
291            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
292            compute_runtime,
293            mem_context: MemoryContext::none(),
294            #[cfg(feature = "datafusion")]
295            df_spillable_budget_ctx: MemoryContext::none(),
296            serverless_backfill_controller_addr: Default::default(),
297            prometheus_client: None,
298            prometheus_selector: String::new(),
299        }
300    }
301
302    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
303        let config = load_config(&opts.config_path, &opts);
304        info!("Starting frontend node");
305        info!("> config: {:?}", config);
306        info!(
307            "> debug assertions: {}",
308            if cfg!(debug_assertions) { "on" } else { "off" }
309        );
310        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
311
312        let frontend_address: HostAddr = opts
313            .advertise_addr
314            .as_ref()
315            .unwrap_or_else(|| {
316                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
317                &opts.listen_addr
318            })
319            .parse()
320            .unwrap();
321        info!("advertise addr is {}", frontend_address);
322
323        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
324        let internal_rpc_host_addr = HostAddr {
325            // Use the host of advertise address for the frontend rpc address.
326            host: frontend_address.host.clone(),
327            port: rpc_addr.port,
328        };
329        // Register in meta by calling `AddWorkerNode` RPC.
330        let (meta_client, system_params_reader) = MetaClient::register_new(
331            opts.meta_addr,
332            WorkerType::Frontend,
333            &frontend_address,
334            AddWorkerNodeProperty {
335                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
336                ..Default::default()
337            },
338            Arc::new(config.meta.clone()),
339        )
340        .await;
341
342        let worker_id = meta_client.worker_id();
343        info!("Assigned worker node id {}", worker_id);
344
345        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
346            meta_client.clone(),
347            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
348        );
349        let mut join_handles = vec![heartbeat_join_handle];
350        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
351
352        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
353        let hummock_snapshot_manager =
354            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
355
356        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
357        let catalog = Arc::new(RwLock::new(Catalog::default()));
358        let catalog_writer = Arc::new(CatalogWriterImpl::new(
359            meta_client.clone(),
360            catalog_updated_rx.clone(),
361            hummock_snapshot_manager.clone(),
362        ));
363        let catalog_reader = CatalogReader::new(catalog.clone());
364
365        let worker_node_manager = Arc::new(WorkerNodeManager::new());
366
367        let compute_client_pool = Arc::new(ComputeClientPool::new(
368            config.batch_exchange_connection_pool_size(),
369            config.batch.developer.compute_client_config.clone(),
370        ));
371        let frontend_client_pool = Arc::new(FrontendClientPool::new(
372            1,
373            config.batch.developer.frontend_client_config.clone(),
374        ));
375        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
376        let query_manager = QueryManager::new(
377            worker_node_manager.clone(),
378            compute_client_pool.clone(),
379            catalog_reader.clone(),
380            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
381            config.batch.distributed_query_limit,
382            config.batch.max_batch_queries_per_frontend_node,
383        );
384
385        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
386        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
387        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
388            meta_client.clone(),
389            catalog_updated_rx,
390        ));
391
392        let system_params_manager =
393            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
394
395        LocalSecretManager::init(
396            opts.temp_secret_file_dir,
397            meta_client.cluster_id().to_owned(),
398            worker_id,
399        );
400
401        // This `session_params` should be initialized during the initial notification in `observer_manager`
402        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
403        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
404        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
405
406        let frontend_observer_node = FrontendObserverNode::new(
407            worker_node_manager.clone(),
408            catalog,
409            catalog_updated_tx,
410            user_info_manager,
411            hummock_snapshot_manager.clone(),
412            system_params_manager.clone(),
413            session_params.clone(),
414            compute_client_pool.clone(),
415        );
416        let observer_manager =
417            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
418                .await;
419        let observer_join_handle = observer_manager.start().await;
420        join_handles.push(observer_join_handle);
421
422        meta_client.activate(&frontend_address).await?;
423
424        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
425        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
426        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
427
428        if config.server.metrics_level > MetricLevel::Disabled {
429            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
430        }
431
432        let health_srv = HealthServiceImpl::new();
433        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
434        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
435        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
436
437        let telemetry_manager = TelemetryManager::new(
438            Arc::new(meta_client.clone()),
439            Arc::new(FrontendTelemetryCreator::new()),
440        );
441
442        // if the toml config file or env variable disables telemetry, do not watch system params
443        // change because if any of configs disable telemetry, we should never start it
444        if config.server.telemetry_enabled && telemetry_env_enabled() {
445            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
446            join_handles.push(join_handle);
447            shutdown_senders.push(shutdown_sender);
448        } else {
449            tracing::info!("Telemetry didn't start due to config");
450        }
451
452        tokio::spawn(async move {
453            tonic::transport::Server::builder()
454                .add_service(HealthServer::new(health_srv))
455                .add_service(FrontendServiceServer::new(frontend_srv))
456                .add_service(MonitorServiceServer::new(monitor_srv))
457                .serve(frontend_rpc_addr)
458                .await
459                .unwrap();
460        });
461        info!(
462            "Health Check RPC Listener is set up on {}",
463            opts.frontend_rpc_listener_addr.clone()
464        );
465
466        let creating_streaming_job_tracker =
467            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
468
469        let runtime = {
470            let mut builder = Builder::new_multi_thread();
471            if let Some(frontend_compute_runtime_worker_threads) =
472                config.batch.frontend_compute_runtime_worker_threads
473            {
474                builder.worker_threads(frontend_compute_runtime_worker_threads);
475            }
476            builder
477                .thread_name("rw-batch-local")
478                .enable_all()
479                .build()
480                .unwrap()
481        };
482        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
483
484        let sessions = sessions_map.clone();
485        // Idle transaction background monitor
486        let join_handle = tokio::spawn(async move {
487            let mut check_idle_txn_interval =
488                tokio::time::interval(core::time::Duration::from_secs(10));
489            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
490            check_idle_txn_interval.reset();
491            loop {
492                check_idle_txn_interval.tick().await;
493                sessions.read().values().for_each(|session| {
494                    let _ = session.check_idle_in_transaction_timeout();
495                })
496            }
497        });
498        join_handles.push(join_handle);
499
500        // Clean up the spill directory.
501        #[cfg(not(madsim))]
502        if config.batch.enable_spill {
503            SpillOp::clean_spill_directory()
504                .await
505                .map_err(|err| anyhow!(err))?;
506        }
507
508        let total_memory_bytes = opts.frontend_total_memory_bytes;
509        let heap_profiler =
510            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
511        // Run a background heap profiler
512        heap_profiler.start();
513
514        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
515        let mem_context = MemoryContext::root(
516            frontend_metrics.batch_total_mem.clone(),
517            batch_memory_limit as u64,
518        );
519        #[cfg(feature = "datafusion")]
520        let df_spillable_budget_ctx =
521            crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
522
523        // Initialize Prometheus client if endpoint is provided
524        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
525            match PrometheusClient::try_from(endpoint.as_str()) {
526                Ok(client) => {
527                    info!("Prometheus client initialized with endpoint: {}", endpoint);
528                    Some(client)
529                }
530                Err(e) => {
531                    error!(
532                        error = %e.as_report(),
533                        "failed to initialize Prometheus client",
534                    );
535                    None
536                }
537            }
538        } else {
539            None
540        };
541
542        // Initialize Prometheus selector
543        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
544
545        info!(
546            "Frontend  total_memory: {} batch_memory: {}",
547            convert(total_memory_bytes as _),
548            convert(batch_memory_limit as _),
549        );
550
551        Ok((
552            Self {
553                catalog_reader,
554                catalog_writer,
555                user_info_reader,
556                user_info_writer,
557                worker_node_manager,
558                meta_client: frontend_meta_client,
559                query_manager,
560                hummock_snapshot_manager,
561                system_params_manager,
562                session_params,
563                server_addr: frontend_address,
564                client_pool: compute_client_pool,
565                frontend_client_pool,
566                monitor_client_pool,
567                frontend_metrics,
568                cursor_metrics,
569                spill_metrics,
570                sessions_map,
571                batch_config: config.batch,
572                frontend_config: config.frontend,
573                meta_config: config.meta,
574                streaming_config: config.streaming,
575                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
576                udf_config: config.udf,
577                source_metrics,
578                creating_streaming_job_tracker,
579                compute_runtime,
580                mem_context,
581                #[cfg(feature = "datafusion")]
582                df_spillable_budget_ctx,
583                prometheus_client,
584                prometheus_selector,
585            },
586            join_handles,
587            shutdown_senders,
588        ))
589    }
590
591    /// Get a reference to the frontend env's catalog writer.
592    ///
593    /// This method is intentionally private, and a write guard is required for the caller to
594    /// prove that the write operations are permitted in the current transaction.
595    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
596        &*self.catalog_writer
597    }
598
599    /// Get a reference to the frontend env's catalog reader.
600    pub fn catalog_reader(&self) -> &CatalogReader {
601        &self.catalog_reader
602    }
603
604    /// Get a reference to the frontend env's user info writer.
605    ///
606    /// This method is intentionally private, and a write guard is required for the caller to
607    /// prove that the write operations are permitted in the current transaction.
608    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
609        &*self.user_info_writer
610    }
611
612    /// Get a reference to the frontend env's user info reader.
613    pub fn user_info_reader(&self) -> &UserInfoReader {
614        &self.user_info_reader
615    }
616
617    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
618        self.worker_node_manager.clone()
619    }
620
621    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
622        &*self.meta_client
623    }
624
625    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
626        self.meta_client.clone()
627    }
628
629    pub fn query_manager(&self) -> &QueryManager {
630        &self.query_manager
631    }
632
633    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
634        &self.hummock_snapshot_manager
635    }
636
637    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
638        &self.system_params_manager
639    }
640
641    pub fn session_params_snapshot(&self) -> SessionConfig {
642        self.session_params.read_recursive().clone()
643    }
644
645    pub fn sbc_address(&self) -> &String {
646        &self.serverless_backfill_controller_addr
647    }
648
649    /// Get a reference to the Prometheus client if available.
650    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
651        self.prometheus_client.as_ref()
652    }
653
654    /// Get the Prometheus selector string.
655    pub fn prometheus_selector(&self) -> &str {
656        &self.prometheus_selector
657    }
658
659    pub fn server_address(&self) -> &HostAddr {
660        &self.server_addr
661    }
662
663    pub fn client_pool(&self) -> ComputeClientPoolRef {
664        self.client_pool.clone()
665    }
666
667    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
668        self.frontend_client_pool.clone()
669    }
670
671    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
672        self.monitor_client_pool.clone()
673    }
674
675    pub fn batch_config(&self) -> &BatchConfig {
676        &self.batch_config
677    }
678
679    pub fn frontend_config(&self) -> &FrontendConfig {
680        &self.frontend_config
681    }
682
683    pub fn streaming_config(&self) -> &StreamingConfig {
684        &self.streaming_config
685    }
686
687    pub fn udf_config(&self) -> &UdfConfig {
688        &self.udf_config
689    }
690
691    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
692        self.source_metrics.clone()
693    }
694
695    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
696        self.spill_metrics.clone()
697    }
698
699    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
700        &self.creating_streaming_job_tracker
701    }
702
703    pub fn sessions_map(&self) -> &SessionMapRef {
704        &self.sessions_map
705    }
706
707    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
708        self.compute_runtime.clone()
709    }
710
711    /// Cancel queries (i.e. batch queries) in session.
712    /// If the session exists return true, otherwise, return false.
713    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
714        cancel_queries_in_session(session_id, self.sessions_map.clone())
715    }
716
717    /// Cancel creating jobs (i.e. streaming queries) in session.
718    /// If the session exists return true, otherwise, return false.
719    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
720        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
721    }
722
723    pub fn mem_context(&self) -> MemoryContext {
724        self.mem_context.clone()
725    }
726
727    #[cfg(feature = "datafusion")]
728    pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
729        self.df_spillable_budget_ctx.clone()
730    }
731}
732
733#[derive(Clone)]
734pub struct AuthContext {
735    pub database: String,
736    pub user_name: String,
737    pub user_id: UserId,
738}
739
740impl AuthContext {
741    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
742        Self {
743            database,
744            user_name,
745            user_id,
746        }
747    }
748}
749pub struct SessionImpl {
750    env: FrontendEnv,
751    auth_context: Arc<RwLock<AuthContext>>,
752    /// Used for user authentication.
753    user_authenticator: UserAuthenticator,
754    /// Stores the value of configurations.
755    config_map: Arc<RwLock<SessionConfig>>,
756
757    /// Channel sender for frontend handler to send notices.
758    notice_tx: UnboundedSender<String>,
759    /// Channel receiver for pgwire to take notices and send to clients.
760    notice_rx: Mutex<UnboundedReceiver<String>>,
761
762    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
763    id: (i32, i32),
764
765    /// Client address
766    peer_addr: AddressRef,
767
768    /// Transaction state.
769    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
770    /// async functions, there should actually be no contention.
771    txn: Arc<Mutex<transaction::State>>,
772
773    /// Query cancel flag.
774    /// This flag is set only when current query is executed in local mode, and used to cancel
775    /// local query.
776    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
777
778    /// execution context represents the lifetime of a running SQL in the current session
779    exec_context: Mutex<Option<Weak<ExecContext>>>,
780
781    /// Last idle instant
782    last_idle_instant: Arc<Mutex<Option<Instant>>>,
783
784    cursor_manager: Arc<CursorManager>,
785
786    /// temporary sources for the current session
787    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
788
789    /// staging catalogs for the current session
790    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
791}
792
793/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
794/// Temporary sources are automatically dropped at the end of a session
795/// Temporary sources are expected to be selected by batch queries, not streaming queries.
796/// Temporary sources currently are only used by cloud portal to preview the data during table and
797/// source creation, so it is a internal feature and not exposed to users.
798/// The current PR supports temporary source with minimum effort,
799/// so we don't care about the database name and schema name, but only care about the source name.
800/// Temporary sources can only be shown via `show sources` command but not other system tables.
801#[derive(Default, Clone)]
802pub struct TemporarySourceManager {
803    sources: HashMap<String, SourceCatalog>,
804}
805
806impl TemporarySourceManager {
807    pub fn new() -> Self {
808        Self {
809            sources: HashMap::new(),
810        }
811    }
812
813    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
814        self.sources.insert(name, source);
815    }
816
817    pub fn drop_source(&mut self, name: &str) {
818        self.sources.remove(name);
819    }
820
821    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
822        self.sources.get(name)
823    }
824
825    pub fn keys(&self) -> Vec<String> {
826        self.sources.keys().cloned().collect()
827    }
828}
829
830/// Staging catalog manager is used to manage the tables creating in the current session.
831#[derive(Default, Clone)]
832pub struct StagingCatalogManager {
833    // staging tables creating in the current session.
834    tables: HashMap<String, TableCatalog>,
835}
836
837impl StagingCatalogManager {
838    pub fn new() -> Self {
839        Self {
840            tables: HashMap::new(),
841        }
842    }
843
844    pub fn create_table(&mut self, name: String, table: TableCatalog) {
845        self.tables.insert(name, table);
846    }
847
848    pub fn drop_table(&mut self, name: &str) {
849        self.tables.remove(name);
850    }
851
852    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
853        self.tables.get(name)
854    }
855}
856
857#[derive(Error, Debug)]
858pub enum CheckRelationError {
859    #[error("{0}")]
860    Resolve(#[from] ResolveQualifiedNameError),
861    #[error("{0}")]
862    Catalog(#[from] CatalogError),
863}
864
865impl From<CheckRelationError> for RwError {
866    fn from(e: CheckRelationError) -> Self {
867        match e {
868            CheckRelationError::Resolve(e) => e.into(),
869            CheckRelationError::Catalog(e) => e.into(),
870        }
871    }
872}
873
874impl SessionImpl {
875    pub(crate) fn new(
876        env: FrontendEnv,
877        auth_context: AuthContext,
878        user_authenticator: UserAuthenticator,
879        id: SessionId,
880        peer_addr: AddressRef,
881        session_config: SessionConfig,
882    ) -> Self {
883        let cursor_metrics = env.cursor_metrics.clone();
884        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
885
886        Self {
887            env,
888            auth_context: Arc::new(RwLock::new(auth_context)),
889            user_authenticator,
890            config_map: Arc::new(RwLock::new(session_config)),
891            id,
892            peer_addr,
893            txn: Default::default(),
894            current_query_cancel_flag: Mutex::new(None),
895            notice_tx,
896            notice_rx: Mutex::new(notice_rx),
897            exec_context: Mutex::new(None),
898            last_idle_instant: Default::default(),
899            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
900            temporary_source_manager: Default::default(),
901            staging_catalog_manager: Default::default(),
902        }
903    }
904
905    #[cfg(test)]
906    pub fn mock() -> Self {
907        let env = FrontendEnv::mock();
908        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
909
910        Self {
911            env: FrontendEnv::mock(),
912            auth_context: Arc::new(RwLock::new(AuthContext::new(
913                DEFAULT_DATABASE_NAME.to_owned(),
914                DEFAULT_SUPER_USER.to_owned(),
915                DEFAULT_SUPER_USER_ID,
916            ))),
917            user_authenticator: UserAuthenticator::None,
918            config_map: Default::default(),
919            // Mock session use non-sense id.
920            id: (0, 0),
921            txn: Default::default(),
922            current_query_cancel_flag: Mutex::new(None),
923            notice_tx,
924            notice_rx: Mutex::new(notice_rx),
925            exec_context: Mutex::new(None),
926            peer_addr: Address::Tcp(SocketAddr::new(
927                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
928                8080,
929            ))
930            .into(),
931            last_idle_instant: Default::default(),
932            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
933            temporary_source_manager: Default::default(),
934            staging_catalog_manager: Default::default(),
935        }
936    }
937
938    pub(crate) fn env(&self) -> &FrontendEnv {
939        &self.env
940    }
941
942    pub fn auth_context(&self) -> Arc<AuthContext> {
943        let ctx = self.auth_context.read();
944        Arc::new(ctx.clone())
945    }
946
947    pub fn database(&self) -> String {
948        self.auth_context.read().database.clone()
949    }
950
951    pub fn database_id(&self) -> DatabaseId {
952        let db_name = self.database();
953        self.env
954            .catalog_reader()
955            .read_guard()
956            .get_database_by_name(&db_name)
957            .map(|db| db.id())
958            .expect("session database not found")
959    }
960
961    pub fn user_name(&self) -> String {
962        self.auth_context.read().user_name.clone()
963    }
964
965    pub fn user_id(&self) -> UserId {
966        self.auth_context.read().user_id
967    }
968
969    pub fn update_database(&self, database: String) {
970        self.auth_context.write().database = database;
971    }
972
973    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
974        Arc::clone(&self.config_map)
975    }
976
977    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
978        self.config_map.read()
979    }
980
981    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
982        self.config_map
983            .write()
984            .set(key, value, &mut ())
985            .map_err(Into::into)
986    }
987
988    pub fn reset_config(&self, key: &str) -> Result<String> {
989        self.config_map
990            .write()
991            .reset(key, &mut ())
992            .map_err(Into::into)
993    }
994
995    pub fn set_config_report(
996        &self,
997        key: &str,
998        value: Option<String>,
999        mut reporter: impl ConfigReporter,
1000    ) -> Result<String> {
1001        if let Some(value) = value {
1002            self.config_map
1003                .write()
1004                .set(key, value, &mut reporter)
1005                .map_err(Into::into)
1006        } else {
1007            self.config_map
1008                .write()
1009                .reset(key, &mut reporter)
1010                .map_err(Into::into)
1011        }
1012    }
1013
1014    pub fn session_id(&self) -> SessionId {
1015        self.id
1016    }
1017
1018    pub fn running_sql(&self) -> Option<Arc<str>> {
1019        self.exec_context
1020            .lock()
1021            .as_ref()
1022            .and_then(|weak| weak.upgrade())
1023            .map(|context| context.running_sql.clone())
1024    }
1025
1026    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1027        self.cursor_manager.clone()
1028    }
1029
1030    pub fn peer_addr(&self) -> &Address {
1031        &self.peer_addr
1032    }
1033
1034    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1035        self.exec_context
1036            .lock()
1037            .as_ref()
1038            .and_then(|weak| weak.upgrade())
1039            .map(|context| context.last_instant.elapsed().as_millis())
1040    }
1041
1042    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1043        self.last_idle_instant
1044            .lock()
1045            .as_ref()
1046            .map(|x| x.elapsed().as_millis())
1047    }
1048
1049    pub fn check_relation_name_duplicated(
1050        &self,
1051        name: ObjectName,
1052        stmt_type: StatementType,
1053        if_not_exists: bool,
1054    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1055        let db_name = &self.database();
1056        let catalog_reader = self.env().catalog_reader().read_guard();
1057        let (schema_name, relation_name) = {
1058            let (schema_name, relation_name) =
1059                Binder::resolve_schema_qualified_name(db_name, &name)?;
1060            let search_path = self.config().search_path();
1061            let user_name = &self.user_name();
1062            let schema_name = match schema_name {
1063                Some(schema_name) => schema_name,
1064                None => catalog_reader
1065                    .first_valid_schema(db_name, &search_path, user_name)?
1066                    .name(),
1067            };
1068            (schema_name, relation_name)
1069        };
1070        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1071            Err(e) if if_not_exists => {
1072                if let CatalogErrorInner::Duplicated {
1073                    name,
1074                    under_creation,
1075                    ..
1076                } = e.inner()
1077                {
1078                    // If relation is created, return directly.
1079                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1080                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1081                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1082                    if !*under_creation {
1083                        Ok(Either::Right(
1084                            PgResponse::builder(stmt_type)
1085                                .notice(format!("relation \"{}\" already exists, skipping", name))
1086                                .into(),
1087                        ))
1088                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1089                        // For now, when a Subscription is creating, we return directly with an additional message.
1090                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1091                        Ok(Either::Right(
1092                            PgResponse::builder(stmt_type)
1093                                .notice(format!(
1094                                    "relation \"{}\" already exists but still creating, skipping",
1095                                    name
1096                                ))
1097                                .into(),
1098                        ))
1099                    } else {
1100                        Ok(Either::Left(()))
1101                    }
1102                } else {
1103                    Err(e.into())
1104                }
1105            }
1106            Err(e) => Err(e.into()),
1107            Ok(_) => Ok(Either::Left(())),
1108        }
1109    }
1110
1111    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1112        let db_name = &self.database();
1113        let catalog_reader = self.env().catalog_reader().read_guard();
1114        let (schema_name, secret_name) = {
1115            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1116            let search_path = self.config().search_path();
1117            let user_name = &self.user_name();
1118            let schema_name = match schema_name {
1119                Some(schema_name) => schema_name,
1120                None => catalog_reader
1121                    .first_valid_schema(db_name, &search_path, user_name)?
1122                    .name(),
1123            };
1124            (schema_name, secret_name)
1125        };
1126        catalog_reader
1127            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1128            .map_err(RwError::from)
1129    }
1130
1131    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1132        let db_name = &self.database();
1133        let catalog_reader = self.env().catalog_reader().read_guard();
1134        let (schema_name, connection_name) = {
1135            let (schema_name, connection_name) =
1136                Binder::resolve_schema_qualified_name(db_name, &name)?;
1137            let search_path = self.config().search_path();
1138            let user_name = &self.user_name();
1139            let schema_name = match schema_name {
1140                Some(schema_name) => schema_name,
1141                None => catalog_reader
1142                    .first_valid_schema(db_name, &search_path, user_name)?
1143                    .name(),
1144            };
1145            (schema_name, connection_name)
1146        };
1147        catalog_reader
1148            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1149            .map_err(RwError::from)
1150    }
1151
1152    pub fn check_function_name_duplicated(
1153        &self,
1154        stmt_type: StatementType,
1155        name: ObjectName,
1156        arg_types: &[DataType],
1157        if_not_exists: bool,
1158    ) -> Result<Either<(), RwPgResponse>> {
1159        let db_name = &self.database();
1160        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1161        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1162
1163        let catalog_reader = self.env().catalog_reader().read_guard();
1164        if catalog_reader
1165            .get_schema_by_id(database_id, schema_id)?
1166            .get_function_by_name_args(&function_name, arg_types)
1167            .is_some()
1168        {
1169            let full_name = format!(
1170                "{function_name}({})",
1171                arg_types.iter().map(|t| t.to_string()).join(",")
1172            );
1173            if if_not_exists {
1174                Ok(Either::Right(
1175                    PgResponse::builder(stmt_type)
1176                        .notice(format!(
1177                            "function \"{}\" already exists, skipping",
1178                            full_name
1179                        ))
1180                        .into(),
1181                ))
1182            } else {
1183                Err(CatalogError::duplicated("function", full_name).into())
1184            }
1185        } else {
1186            Ok(Either::Left(()))
1187        }
1188    }
1189
1190    /// Also check if the user has the privilege to create in the schema.
1191    pub fn get_database_and_schema_id_for_create(
1192        &self,
1193        schema_name: Option<String>,
1194    ) -> Result<(DatabaseId, SchemaId)> {
1195        let db_name = &self.database();
1196
1197        let search_path = self.config().search_path();
1198        let user_name = &self.user_name();
1199
1200        let catalog_reader = self.env().catalog_reader().read_guard();
1201        let schema = match schema_name {
1202            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1203            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1204        };
1205        let schema_name = schema.name();
1206
1207        check_schema_writable(&schema_name)?;
1208        self.check_privileges(&[ObjectCheckItem::new(
1209            schema.owner(),
1210            AclMode::Create,
1211            schema_name,
1212            schema.id(),
1213        )])?;
1214
1215        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1216        Ok((db_id, schema.id()))
1217    }
1218
1219    pub fn get_connection_by_name(
1220        &self,
1221        schema_name: Option<String>,
1222        connection_name: &str,
1223    ) -> Result<Arc<ConnectionCatalog>> {
1224        let db_name = &self.database();
1225        let search_path = self.config().search_path();
1226        let user_name = &self.user_name();
1227
1228        let catalog_reader = self.env().catalog_reader().read_guard();
1229        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1230        let (connection, _) =
1231            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1232
1233        self.check_privileges(&[ObjectCheckItem::new(
1234            connection.owner(),
1235            AclMode::Usage,
1236            connection.name.clone(),
1237            connection.id,
1238        )])?;
1239
1240        Ok(connection.clone())
1241    }
1242
1243    pub fn get_subscription_by_schema_id_name(
1244        &self,
1245        schema_id: SchemaId,
1246        subscription_name: &str,
1247    ) -> Result<Arc<SubscriptionCatalog>> {
1248        let db_name = &self.database();
1249
1250        let catalog_reader = self.env().catalog_reader().read_guard();
1251        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1252        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1253        let subscription = schema
1254            .get_subscription_by_name(subscription_name)
1255            .ok_or_else(|| {
1256                RwError::from(ErrorCode::ItemNotFound(format!(
1257                    "subscription {} not found",
1258                    subscription_name
1259                )))
1260            })?;
1261        Ok(subscription.clone())
1262    }
1263
1264    pub fn get_subscription_by_name(
1265        &self,
1266        schema_name: Option<String>,
1267        subscription_name: &str,
1268    ) -> Result<Arc<SubscriptionCatalog>> {
1269        let db_name = &self.database();
1270        let search_path = self.config().search_path();
1271        let user_name = &self.user_name();
1272
1273        let catalog_reader = self.env().catalog_reader().read_guard();
1274        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1275        let (subscription, _) =
1276            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1277        Ok(subscription.clone())
1278    }
1279
1280    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1281        let catalog_reader = self.env().catalog_reader().read_guard();
1282        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1283    }
1284
1285    pub fn get_table_by_name(
1286        &self,
1287        table_name: &str,
1288        db_id: DatabaseId,
1289        schema_id: SchemaId,
1290    ) -> Result<Arc<TableCatalog>> {
1291        let catalog_reader = self.env().catalog_reader().read_guard();
1292        let table = catalog_reader
1293            .get_schema_by_id(db_id, schema_id)?
1294            .get_created_table_by_name(table_name)
1295            .ok_or_else(|| {
1296                Error::new(
1297                    ErrorKind::InvalidInput,
1298                    format!("table \"{}\" does not exist", table_name),
1299                )
1300            })?;
1301
1302        self.check_privileges(&[ObjectCheckItem::new(
1303            table.owner(),
1304            AclMode::Select,
1305            table_name.to_owned(),
1306            table.id,
1307        )])?;
1308
1309        Ok(table.clone())
1310    }
1311
1312    pub fn get_secret_by_name(
1313        &self,
1314        schema_name: Option<String>,
1315        secret_name: &str,
1316    ) -> Result<Arc<SecretCatalog>> {
1317        let db_name = &self.database();
1318        let search_path = self.config().search_path();
1319        let user_name = &self.user_name();
1320
1321        let catalog_reader = self.env().catalog_reader().read_guard();
1322        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1323        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1324
1325        self.check_privileges(&[ObjectCheckItem::new(
1326            secret.owner(),
1327            AclMode::Usage,
1328            secret.name.clone(),
1329            secret.id,
1330        )])?;
1331
1332        Ok(secret.clone())
1333    }
1334
1335    pub async fn list_change_log_epochs(
1336        &self,
1337        table_id: TableId,
1338        min_epoch: u64,
1339        max_count: u32,
1340    ) -> Result<Vec<u64>> {
1341        let Some(max_epoch) = self
1342            .env
1343            .hummock_snapshot_manager()
1344            .acquire()
1345            .version()
1346            .state_table_info
1347            .info()
1348            .get(&table_id)
1349            .map(|s| s.committed_epoch)
1350        else {
1351            return Ok(vec![]);
1352        };
1353        let ret = self
1354            .env
1355            .meta_client_ref()
1356            .get_hummock_table_change_log(
1357                Some(min_epoch),
1358                Some(max_epoch),
1359                Some(iter::once(table_id).collect()),
1360                true,
1361                Some(max_count),
1362            )
1363            .await?;
1364        let Some(e) = ret.get(&table_id) else {
1365            return Ok(vec![]);
1366        };
1367        Ok(e.iter()
1368            .flat_map(|l| l.epochs())
1369            .filter(|e| *e >= min_epoch && *e <= max_epoch)
1370            .take(max_count as usize)
1371            .collect())
1372    }
1373
1374    pub fn clear_cancel_query_flag(&self) {
1375        let mut flag = self.current_query_cancel_flag.lock();
1376        *flag = None;
1377    }
1378
1379    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1380        let mut flag = self.current_query_cancel_flag.lock();
1381        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1382        *flag = Some(shutdown_tx);
1383        shutdown_rx
1384    }
1385
1386    pub fn cancel_current_query(&self) {
1387        let mut flag_guard = self.current_query_cancel_flag.lock();
1388        if let Some(sender) = flag_guard.take() {
1389            info!("Trying to cancel query in local mode.");
1390            // Current running query is in local mode
1391            sender.cancel();
1392            info!("Cancel query request sent.");
1393        } else {
1394            info!("Trying to cancel query in distributed mode.");
1395            self.env.query_manager().cancel_queries_in_session(self.id)
1396        }
1397    }
1398
1399    pub fn cancel_current_creating_job(&self) {
1400        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1401    }
1402
1403    /// This function only used for test now.
1404    /// Maybe we can remove it in the future.
1405    pub async fn run_statement(
1406        self: Arc<Self>,
1407        sql: Arc<str>,
1408        formats: Vec<Format>,
1409    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1410        // Parse sql.
1411        let mut stmts = Parser::parse_sql(&sql)?;
1412        if stmts.is_empty() {
1413            return Ok(PgResponse::empty_result(
1414                pgwire::pg_response::StatementType::EMPTY,
1415            ));
1416        }
1417        if stmts.len() > 1 {
1418            return Ok(
1419                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1420                    .notice("cannot insert multiple commands into statement")
1421                    .into(),
1422            );
1423        }
1424        let stmt = stmts.swap_remove(0);
1425        let rsp = Box::pin(handle(self, stmt, sql.clone(), formats)).await?;
1426        Ok(rsp)
1427    }
1428
1429    pub fn notice_to_user(&self, str: impl Into<String>) {
1430        let notice = str.into();
1431        tracing::trace!(notice, "notice to user");
1432        self.notice_tx
1433            .send(notice)
1434            .expect("notice channel should not be closed");
1435    }
1436
1437    pub fn is_barrier_read(&self) -> bool {
1438        match self.config().visibility_mode() {
1439            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1440            VisibilityMode::All => true,
1441            VisibilityMode::Checkpoint => false,
1442        }
1443    }
1444
1445    pub fn statement_timeout(&self) -> Duration {
1446        if self.config().statement_timeout().millis() == 0 {
1447            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1448        } else {
1449            Duration::from_millis(self.config().statement_timeout().millis() as u64)
1450        }
1451    }
1452
1453    pub fn create_temporary_source(&self, source: SourceCatalog) {
1454        self.temporary_source_manager
1455            .lock()
1456            .create_source(source.name.clone(), source);
1457    }
1458
1459    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1460        self.temporary_source_manager
1461            .lock()
1462            .get_source(name)
1463            .cloned()
1464    }
1465
1466    pub fn drop_temporary_source(&self, name: &str) {
1467        self.temporary_source_manager.lock().drop_source(name);
1468    }
1469
1470    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1471        self.temporary_source_manager.lock().clone()
1472    }
1473
1474    pub fn create_staging_table(&self, table: TableCatalog) {
1475        self.staging_catalog_manager
1476            .lock()
1477            .create_table(table.name.clone(), table);
1478    }
1479
1480    pub fn drop_staging_table(&self, name: &str) {
1481        self.staging_catalog_manager.lock().drop_table(name);
1482    }
1483
1484    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1485        self.staging_catalog_manager.lock().clone()
1486    }
1487
1488    pub async fn check_cluster_limits(&self) -> Result<()> {
1489        if self.config().bypass_cluster_limits() {
1490            return Ok(());
1491        }
1492
1493        let gen_message = |ActorCountPerParallelism {
1494                               worker_id_to_actor_count,
1495                               hard_limit,
1496                               soft_limit,
1497                           }: ActorCountPerParallelism,
1498                           exceed_hard_limit: bool|
1499         -> String {
1500            let (limit_type, action) = if exceed_hard_limit {
1501                ("critical", "Scale the cluster immediately to proceed.")
1502            } else {
1503                (
1504                    "recommended",
1505                    "Consider scaling the cluster for optimal performance.",
1506                )
1507            };
1508            format!(
1509                r#"Actor count per parallelism exceeds the {limit_type} limit.
1510
1511Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1512
1513HINT:
1514- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1515- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1516  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1517- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1518
1519DETAILS:
1520- hard limit: {hard_limit}
1521- soft limit: {soft_limit}
1522- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1523            )
1524        };
1525
1526        let limits = self.env().meta_client().get_cluster_limits().await?;
1527        for limit in limits {
1528            match limit {
1529                cluster_limit::ClusterLimit::ActorCount(l) => {
1530                    if l.exceed_hard_limit() {
1531                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1532                            l, true,
1533                        ))));
1534                    } else if l.exceed_soft_limit() {
1535                        self.notice_to_user(gen_message(l, false));
1536                    }
1537                }
1538            }
1539        }
1540        Ok(())
1541    }
1542}
1543
1544pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1545    std::sync::OnceLock::new();
1546
1547pub struct SessionManagerImpl {
1548    env: FrontendEnv,
1549    _join_handles: Vec<JoinHandle<()>>,
1550    _shutdown_senders: Vec<Sender<()>>,
1551    number: AtomicI32,
1552}
1553
1554impl SessionManager for SessionManagerImpl {
1555    type Error = RwError;
1556    type Session = SessionImpl;
1557
1558    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1559        let dummy_addr = Address::Tcp(SocketAddr::new(
1560            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1561            5691, // port of meta
1562        ));
1563
1564        // Always use the built-in super user for dummy sessions.
1565        // This avoids permission checks tied to a specific user_id.
1566        self.connect_inner(
1567            database_id,
1568            risingwave_common::catalog::DEFAULT_SUPER_USER,
1569            Arc::new(dummy_addr),
1570        )
1571    }
1572
1573    fn connect(
1574        &self,
1575        database: &str,
1576        user_name: &str,
1577        peer_addr: AddressRef,
1578    ) -> Result<Arc<Self::Session>> {
1579        let catalog_reader = self.env.catalog_reader();
1580        let reader = catalog_reader.read_guard();
1581        let database_id = reader.get_database_by_name(database)?.id();
1582
1583        self.connect_inner(database_id, user_name, peer_addr)
1584    }
1585
1586    /// Used when cancel request happened.
1587    fn cancel_queries_in_session(&self, session_id: SessionId) {
1588        self.env.cancel_queries_in_session(session_id);
1589    }
1590
1591    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1592        self.env.cancel_creating_jobs_in_session(session_id);
1593    }
1594
1595    fn end_session(&self, session: &Self::Session) {
1596        self.delete_session(&session.session_id());
1597    }
1598
1599    async fn shutdown(&self) {
1600        // Clean up the session map.
1601        self.env.sessions_map().write().clear();
1602        // Unregister from the meta service.
1603        self.env.meta_client().try_unregister().await;
1604    }
1605}
1606
1607impl SessionManagerImpl {
1608    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1609        // TODO(shutdown): only save join handles that **need** to be shutdown
1610        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1611        Ok(Self {
1612            env,
1613            _join_handles: join_handles,
1614            _shutdown_senders: shutdown_senders,
1615            number: AtomicI32::new(0),
1616        })
1617    }
1618
1619    pub(crate) fn env(&self) -> &FrontendEnv {
1620        &self.env
1621    }
1622
1623    fn insert_session(&self, session: Arc<SessionImpl>) {
1624        let active_sessions = {
1625            let mut write_guard = self.env.sessions_map.write();
1626            write_guard.insert(session.id(), session);
1627            write_guard.len()
1628        };
1629        self.env
1630            .frontend_metrics
1631            .active_sessions
1632            .set(active_sessions as i64);
1633    }
1634
1635    fn delete_session(&self, session_id: &SessionId) {
1636        let active_sessions = {
1637            let mut write_guard = self.env.sessions_map.write();
1638            write_guard.remove(session_id);
1639            write_guard.len()
1640        };
1641        self.env
1642            .frontend_metrics
1643            .active_sessions
1644            .set(active_sessions as i64);
1645    }
1646
1647    fn connect_inner(
1648        &self,
1649        database_id: DatabaseId,
1650        user_name: &str,
1651        peer_addr: AddressRef,
1652    ) -> Result<Arc<SessionImpl>> {
1653        let catalog_reader = self.env.catalog_reader();
1654        let reader = catalog_reader.read_guard();
1655        let (database_name, database_owner) = {
1656            let db = reader.get_database_by_id(database_id)?;
1657            (db.name(), db.owner())
1658        };
1659
1660        let user_reader = self.env.user_info_reader();
1661        let reader = user_reader.read_guard();
1662        if let Some(user) = reader.get_user_by_name(user_name) {
1663            if !user.can_login {
1664                bail_permission_denied!("User {} is not allowed to login", user_name);
1665            }
1666            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1667            if !user.is_super && database_owner != user.id && !has_privilege {
1668                bail_permission_denied!("User does not have CONNECT privilege.");
1669            }
1670
1671            // Check HBA configuration for LDAP authentication
1672            let (connection_type, client_addr) = match peer_addr.as_ref() {
1673                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1674                Address::Unix(_) => (ConnectionType::Local, None),
1675            };
1676            tracing::debug!(
1677                "receive connection: type={:?}, client_addr={:?}",
1678                connection_type,
1679                client_addr
1680            );
1681
1682            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1683                &connection_type,
1684                database_name,
1685                user_name,
1686                client_addr,
1687            );
1688
1689            let Some(hba_entry_opt) = hba_entry_opt else {
1690                bail_permission_denied!(
1691                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1692                );
1693            };
1694
1695            // Determine the user authenticator based on the user's auth info.
1696            let authenticator_by_info = || -> Result<UserAuthenticator> {
1697                let authenticator = match &user.auth_info {
1698                    None => UserAuthenticator::None,
1699                    Some(auth_info) => match auth_info.encryption_type() {
1700                        EncryptionType::Plaintext => {
1701                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1702                        }
1703                        EncryptionType::Md5 => {
1704                            let mut salt = [0; 4];
1705                            let mut rng = rand::rng();
1706                            rng.fill_bytes(&mut salt);
1707                            UserAuthenticator::Md5WithSalt {
1708                                encrypted_password: md5_hash_with_salt(
1709                                    &auth_info.encrypted_value,
1710                                    &salt,
1711                                ),
1712                                salt,
1713                            }
1714                        }
1715                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1716                            metadata: auth_info.metadata.clone(),
1717                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1718                        },
1719                        _ => {
1720                            bail_protocol_error!(
1721                                "Unsupported auth type: {}",
1722                                auth_info.encryption_type().as_str_name()
1723                            );
1724                        }
1725                    },
1726                };
1727                Ok(authenticator)
1728            };
1729
1730            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1731                (AuthMethod::Trust, _) => UserAuthenticator::None,
1732                // For backward compatibility, we allow password auth method to work with any auth info.
1733                (AuthMethod::Password, _) => authenticator_by_info()?,
1734                (AuthMethod::Md5, Some(auth_info))
1735                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1736                {
1737                    authenticator_by_info()?
1738                }
1739                (AuthMethod::OAuth, Some(auth_info))
1740                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1741                {
1742                    authenticator_by_info()?
1743                }
1744                (AuthMethod::Ldap, _) => {
1745                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1746                }
1747                _ => {
1748                    bail_permission_denied!(
1749                        "password authentication failed for user \"{user_name}\""
1750                    );
1751                }
1752            };
1753
1754            // Assign a session id and insert into sessions map (for cancel request).
1755            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1756            // Use a trivial strategy: process_id and secret_key are equal.
1757            let id = (secret_key, secret_key);
1758            // Read session params snapshot from frontend env.
1759            let session_config = self.env.session_params_snapshot();
1760
1761            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1762                self.env.clone(),
1763                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1764                user_authenticator,
1765                id,
1766                peer_addr,
1767                session_config,
1768            )
1769            .into();
1770            self.insert_session(session_impl.clone());
1771
1772            Ok(session_impl)
1773        } else {
1774            bail_catalog_error!("Role {} does not exist", user_name);
1775        }
1776    }
1777}
1778
1779impl Session for SessionImpl {
1780    type Error = RwError;
1781    type Portal = Portal;
1782    type PreparedStatement = PrepareStatement;
1783    type ValuesStream = PgResponseStream;
1784
1785    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1786    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1787    async fn run_one_query(
1788        self: Arc<Self>,
1789        stmt: Statement,
1790        format: Format,
1791    ) -> Result<PgResponse<PgResponseStream>> {
1792        let string = stmt.to_string();
1793        let sql_str = string.as_str();
1794        let sql: Arc<str> = Arc::from(sql_str);
1795        // The handle can be slow. Release potential large String early.
1796        drop(string);
1797        let rsp = Box::pin(handle(self, stmt, sql, vec![format])).await?;
1798        Ok(rsp)
1799    }
1800
1801    fn user_authenticator(&self) -> &UserAuthenticator {
1802        &self.user_authenticator
1803    }
1804
1805    fn id(&self) -> SessionId {
1806        self.id
1807    }
1808
1809    async fn parse(
1810        self: Arc<Self>,
1811        statement: Option<Statement>,
1812        params_types: Vec<Option<DataType>>,
1813    ) -> Result<PrepareStatement> {
1814        Ok(if let Some(statement) = statement {
1815            handle_parse(self, statement, params_types).await?
1816        } else {
1817            PrepareStatement::Empty
1818        })
1819    }
1820
1821    fn bind(
1822        self: Arc<Self>,
1823        prepare_statement: PrepareStatement,
1824        params: Vec<Option<Bytes>>,
1825        param_formats: Vec<Format>,
1826        result_formats: Vec<Format>,
1827    ) -> Result<Portal> {
1828        handle_bind(prepare_statement, params, param_formats, result_formats)
1829    }
1830
1831    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1832        let rsp = Box::pin(handle_execute(self, portal)).await?;
1833        Ok(rsp)
1834    }
1835
1836    fn describe_statement(
1837        self: Arc<Self>,
1838        prepare_statement: PrepareStatement,
1839    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1840        Ok(match prepare_statement {
1841            PrepareStatement::Empty => (vec![], vec![]),
1842            PrepareStatement::Prepared(prepare_statement) => (
1843                prepare_statement.bound_result.param_types,
1844                infer(
1845                    Some(prepare_statement.bound_result.bound),
1846                    prepare_statement.statement,
1847                )?,
1848            ),
1849            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1850        })
1851    }
1852
1853    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1854        match portal {
1855            Portal::Empty => Ok(vec![]),
1856            Portal::Portal(portal) => {
1857                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1858                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1859                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1860                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1861                    if f == Format::Binary {
1862                        c.set_to_binary()
1863                    }
1864                });
1865                Ok(columns)
1866            }
1867            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1868        }
1869    }
1870
1871    fn get_config(&self, key: &str) -> Result<String> {
1872        self.config().get(key).map_err(Into::into)
1873    }
1874
1875    fn set_config(&self, key: &str, value: String) -> Result<String> {
1876        Self::set_config(self, key, value)
1877    }
1878
1879    async fn next_notice(self: &Arc<Self>) -> String {
1880        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1881            .await
1882            .expect("notice channel should not be closed")
1883    }
1884
1885    fn transaction_status(&self) -> TransactionStatus {
1886        match &*self.txn.lock() {
1887            transaction::State::Initial | transaction::State::Implicit(_) => {
1888                TransactionStatus::Idle
1889            }
1890            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1891            // TODO: failed transaction
1892        }
1893    }
1894
1895    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1896    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1897        let exec_context = Arc::new(ExecContext {
1898            running_sql: sql,
1899            last_instant: Instant::now(),
1900            last_idle_instant: self.last_idle_instant.clone(),
1901        });
1902        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1903        // unset idle state, since there is a sql running
1904        *self.last_idle_instant.lock() = None;
1905        ExecContextGuard::new(exec_context)
1906    }
1907
1908    /// Check whether idle transaction timeout.
1909    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1910    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1911        // In transaction.
1912        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1913            let idle_in_transaction_session_timeout =
1914                self.config().idle_in_transaction_session_timeout() as u128;
1915            // Idle transaction timeout has been enabled.
1916            if idle_in_transaction_session_timeout != 0 {
1917                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1918                let guard = self.exec_context.lock();
1919                // No running sql i.e. idle
1920                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1921                    // Idle timeout.
1922                    if let Some(elapse_since_last_idle_instant) =
1923                        self.elapse_since_last_idle_instant()
1924                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1925                    {
1926                        return Err(PsqlError::IdleInTxnTimeout);
1927                    }
1928                }
1929            }
1930        }
1931        Ok(())
1932    }
1933
1934    fn user(&self) -> String {
1935        self.user_name()
1936    }
1937}
1938
1939/// Returns row description of the statement
1940fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1941    match stmt {
1942        Statement::Query(_)
1943        | Statement::Insert { .. }
1944        | Statement::Delete { .. }
1945        | Statement::Update { .. }
1946        | Statement::FetchCursor { .. } => Ok(bound
1947            .unwrap()
1948            .output_fields()
1949            .iter()
1950            .map(to_pg_field)
1951            .collect()),
1952        Statement::ShowObjects {
1953            object: show_object,
1954            ..
1955        } => Ok(infer_show_object(&show_object)),
1956        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1957        Statement::ShowTransactionIsolationLevel => {
1958            let name = "transaction_isolation";
1959            Ok(infer_show_variable(name))
1960        }
1961        Statement::ShowVariable { variable } => {
1962            let name = &variable[0].real_value().to_lowercase();
1963            Ok(infer_show_variable(name))
1964        }
1965        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1966        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1967            "QUERY PLAN".to_owned(),
1968            DataType::Varchar.to_oid(),
1969            DataType::Varchar.type_len(),
1970        )]),
1971        _ => Ok(vec![]),
1972    }
1973}
1974
1975pub struct WorkerProcessId {
1976    pub worker_id: WorkerId,
1977    pub process_id: i32,
1978}
1979
1980impl WorkerProcessId {
1981    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1982        Self {
1983            worker_id,
1984            process_id,
1985        }
1986    }
1987}
1988
1989impl Display for WorkerProcessId {
1990    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1991        write!(f, "{}:{}", self.worker_id, self.process_id)
1992    }
1993}
1994
1995impl TryFrom<String> for WorkerProcessId {
1996    type Error = String;
1997
1998    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1999        const INVALID: &str = "invalid WorkerProcessId";
2000        let splits: Vec<&str> = worker_process_id.split(":").collect();
2001        if splits.len() != 2 {
2002            return Err(INVALID.to_owned());
2003        }
2004        let Ok(worker_id) = splits[0].parse::<u32>() else {
2005            return Err(INVALID.to_owned());
2006        };
2007        let Ok(process_id) = splits[1].parse::<i32>() else {
2008            return Err(INVALID.to_owned());
2009        };
2010        Ok(WorkerProcessId::new(worker_id.into(), process_id))
2011    }
2012}
2013
2014pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2015    let guard = sessions_map.read();
2016    if let Some(session) = guard.get(&session_id) {
2017        session.cancel_current_query();
2018        true
2019    } else {
2020        info!("Current session finished, ignoring cancel query request");
2021        false
2022    }
2023}
2024
2025pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
2026    let guard = sessions_map.read();
2027    if let Some(session) = guard.get(&session_id) {
2028        session.cancel_current_creating_job();
2029        true
2030    } else {
2031        info!("Current session finished, ignoring cancel creating request");
2032        false
2033    }
2034}