risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35    UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44    WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60    LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
80use risingwave_pb::user::auth_info::EncryptionType;
81use risingwave_rpc_client::{
82    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
83    MonitorClientPool, MonitorClientPoolRef,
84};
85use risingwave_sqlparser::ast::{ObjectName, Statement};
86use risingwave_sqlparser::parser::Parser;
87use thiserror::Error;
88use thiserror_ext::AsReport;
89use tokio::runtime::Builder;
90use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
91use tokio::sync::oneshot::Sender;
92use tokio::sync::watch;
93use tokio::task::JoinHandle;
94use tracing::{error, info};
95
96use self::cursor_manager::CursorManager;
97use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
98use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
99use crate::catalog::connection_catalog::ConnectionCatalog;
100use crate::catalog::root_catalog::{Catalog, SchemaPath};
101use crate::catalog::secret_catalog::SecretCatalog;
102use crate::catalog::source_catalog::SourceCatalog;
103use crate::catalog::subscription_catalog::SubscriptionCatalog;
104use crate::catalog::{
105    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
106    check_schema_writable,
107};
108use crate::error::{
109    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
110};
111use crate::handler::describe::infer_describe;
112use crate::handler::extended_handle::{
113    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
114};
115use crate::handler::privilege::ObjectCheckItem;
116use crate::handler::show::{infer_show_create_object, infer_show_object};
117use crate::handler::util::to_pg_field;
118use crate::handler::variable::infer_show_variable;
119use crate::handler::{RwPgResponse, handle};
120use crate::health_service::HealthServiceImpl;
121use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
122use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
123use crate::observer::FrontendObserverNode;
124use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
125use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
126use crate::scheduler::{
127    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
128    HummockSnapshotManagerRef, QueryManager,
129};
130use crate::telemetry::FrontendTelemetryCreator;
131use crate::user::UserId;
132use crate::user::user_authentication::md5_hash_with_salt;
133use crate::user::user_manager::UserInfoManager;
134use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
135use crate::{FrontendOpts, PgResponseStream, TableCatalog};
136
137pub(crate) mod current;
138pub(crate) mod cursor_manager;
139pub(crate) mod transaction;
140
141/// The global environment for the frontend server.
142#[derive(Clone)]
143pub(crate) struct FrontendEnv {
144    // Different session may access catalog at the same time and catalog is protected by a
145    // RwLock.
146    meta_client: Arc<dyn FrontendMetaClient>,
147    catalog_writer: Arc<dyn CatalogWriter>,
148    catalog_reader: CatalogReader,
149    user_info_writer: Arc<dyn UserInfoWriter>,
150    user_info_reader: UserInfoReader,
151    worker_node_manager: WorkerNodeManagerRef,
152    query_manager: QueryManager,
153    hummock_snapshot_manager: HummockSnapshotManagerRef,
154    system_params_manager: LocalSystemParamsManagerRef,
155    session_params: Arc<RwLock<SessionConfig>>,
156
157    server_addr: HostAddr,
158    client_pool: ComputeClientPoolRef,
159    frontend_client_pool: FrontendClientPoolRef,
160    monitor_client_pool: MonitorClientPoolRef,
161
162    /// Each session is identified by (`process_id`,
163    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
164    /// running queries.
165    sessions_map: SessionMapRef,
166
167    pub frontend_metrics: Arc<FrontendMetrics>,
168
169    pub cursor_metrics: Arc<CursorMetrics>,
170
171    source_metrics: Arc<SourceMetrics>,
172
173    /// Batch spill metrics
174    spill_metrics: Arc<BatchSpillMetrics>,
175
176    batch_config: BatchConfig,
177    frontend_config: FrontendConfig,
178    #[expect(dead_code)]
179    meta_config: MetaConfig,
180    streaming_config: StreamingConfig,
181    udf_config: UdfConfig,
182
183    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
184    /// received.
185    creating_streaming_job_tracker: StreamingJobTrackerRef,
186
187    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
188    /// root stage in mpp mode.
189    compute_runtime: Arc<BackgroundShutdownRuntime>,
190
191    /// Memory context used for batch executors in frontend.
192    mem_context: MemoryContext,
193
194    #[cfg(feature = "datafusion")]
195    /// Shared budget context for **DataFusion** spillable operators.
196    /// Created by [`crate::datafusion::execute::memory_ctx::create_df_spillable_budget_ctx`];
197    /// caps total spillable usage so unspillable operators always have headroom.
198    /// Not used by the RisingWave batch engine.
199    df_spillable_budget_ctx: MemoryContext,
200
201    /// address of the serverless backfill controller.
202    serverless_backfill_controller_addr: String,
203
204    /// Prometheus client for querying metrics.
205    prometheus_client: Option<PrometheusClient>,
206
207    /// The additional selector used when querying Prometheus.
208    prometheus_selector: String,
209}
210
211/// Session map identified by `(process_id, secret_key)`
212pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
213
214/// The proportion of frontend memory used for batch processing.
215const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
216
217impl FrontendEnv {
218    pub fn mock() -> Self {
219        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
220
221        let catalog = Arc::new(RwLock::new(Catalog::default()));
222        let meta_client = Arc::new(MockFrontendMetaClient {});
223        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
224        let catalog_writer = Arc::new(MockCatalogWriter::new(
225            catalog.clone(),
226            hummock_snapshot_manager.clone(),
227        ));
228        let catalog_reader = CatalogReader::new(catalog);
229        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
230        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
231        let user_info_reader = UserInfoReader::new(user_info_manager);
232        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
233        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
234        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
235        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
236        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
237        let query_manager = QueryManager::new(
238            worker_node_manager.clone(),
239            compute_client_pool,
240            catalog_reader.clone(),
241            Arc::new(DistributedQueryMetrics::for_test()),
242            None,
243            None,
244        );
245        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
246        let client_pool = Arc::new(ComputeClientPool::for_test());
247        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
248        let runtime = {
249            let mut builder = Builder::new_multi_thread();
250            if let Some(frontend_compute_runtime_worker_threads) =
251                load_config("", FrontendOpts::default())
252                    .batch
253                    .frontend_compute_runtime_worker_threads
254            {
255                builder.worker_threads(frontend_compute_runtime_worker_threads);
256            }
257            builder
258                .thread_name("rw-batch-local")
259                .enable_all()
260                .build()
261                .unwrap()
262        };
263        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
264        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
265        Self {
266            meta_client,
267            catalog_writer,
268            catalog_reader,
269            user_info_writer,
270            user_info_reader,
271            worker_node_manager,
272            query_manager,
273            hummock_snapshot_manager,
274            system_params_manager,
275            session_params: Default::default(),
276            server_addr,
277            client_pool,
278            frontend_client_pool,
279            monitor_client_pool,
280            sessions_map,
281            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
282            cursor_metrics: Arc::new(CursorMetrics::for_test()),
283            batch_config: BatchConfig::default(),
284            frontend_config: FrontendConfig::default(),
285            meta_config: MetaConfig::default(),
286            streaming_config: StreamingConfig::default(),
287            udf_config: UdfConfig::default(),
288            source_metrics: Arc::new(SourceMetrics::default()),
289            spill_metrics: BatchSpillMetrics::for_test(),
290            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
291            compute_runtime,
292            mem_context: MemoryContext::none(),
293            #[cfg(feature = "datafusion")]
294            df_spillable_budget_ctx: MemoryContext::none(),
295            serverless_backfill_controller_addr: Default::default(),
296            prometheus_client: None,
297            prometheus_selector: String::new(),
298        }
299    }
300
301    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
302        let config = load_config(&opts.config_path, &opts);
303        info!("Starting frontend node");
304        info!("> config: {:?}", config);
305        info!(
306            "> debug assertions: {}",
307            if cfg!(debug_assertions) { "on" } else { "off" }
308        );
309        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
310
311        let frontend_address: HostAddr = opts
312            .advertise_addr
313            .as_ref()
314            .unwrap_or_else(|| {
315                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
316                &opts.listen_addr
317            })
318            .parse()
319            .unwrap();
320        info!("advertise addr is {}", frontend_address);
321
322        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
323        let internal_rpc_host_addr = HostAddr {
324            // Use the host of advertise address for the frontend rpc address.
325            host: frontend_address.host.clone(),
326            port: rpc_addr.port,
327        };
328        // Register in meta by calling `AddWorkerNode` RPC.
329        let (meta_client, system_params_reader) = MetaClient::register_new(
330            opts.meta_addr,
331            WorkerType::Frontend,
332            &frontend_address,
333            AddWorkerNodeProperty {
334                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
335                ..Default::default()
336            },
337            Arc::new(config.meta.clone()),
338        )
339        .await;
340
341        let worker_id = meta_client.worker_id();
342        info!("Assigned worker node id {}", worker_id);
343
344        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
345            meta_client.clone(),
346            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
347        );
348        let mut join_handles = vec![heartbeat_join_handle];
349        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
350
351        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
352        let hummock_snapshot_manager =
353            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
354
355        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
356        let catalog = Arc::new(RwLock::new(Catalog::default()));
357        let catalog_writer = Arc::new(CatalogWriterImpl::new(
358            meta_client.clone(),
359            catalog_updated_rx.clone(),
360            hummock_snapshot_manager.clone(),
361        ));
362        let catalog_reader = CatalogReader::new(catalog.clone());
363
364        let worker_node_manager = Arc::new(WorkerNodeManager::new());
365
366        let compute_client_pool = Arc::new(ComputeClientPool::new(
367            config.batch_exchange_connection_pool_size(),
368            config.batch.developer.compute_client_config.clone(),
369        ));
370        let frontend_client_pool = Arc::new(FrontendClientPool::new(
371            1,
372            config.batch.developer.frontend_client_config.clone(),
373        ));
374        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
375        let query_manager = QueryManager::new(
376            worker_node_manager.clone(),
377            compute_client_pool.clone(),
378            catalog_reader.clone(),
379            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
380            config.batch.distributed_query_limit,
381            config.batch.max_batch_queries_per_frontend_node,
382        );
383
384        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
385        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
386        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
387            meta_client.clone(),
388            catalog_updated_rx,
389        ));
390
391        let system_params_manager =
392            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
393
394        LocalSecretManager::init(
395            opts.temp_secret_file_dir,
396            meta_client.cluster_id().to_owned(),
397            worker_id,
398        );
399
400        // This `session_params` should be initialized during the initial notification in `observer_manager`
401        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
402        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
403        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
404
405        let frontend_observer_node = FrontendObserverNode::new(
406            worker_node_manager.clone(),
407            catalog,
408            catalog_updated_tx,
409            user_info_manager,
410            hummock_snapshot_manager.clone(),
411            system_params_manager.clone(),
412            session_params.clone(),
413            compute_client_pool.clone(),
414        );
415        let observer_manager =
416            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
417                .await;
418        let observer_join_handle = observer_manager.start().await;
419        join_handles.push(observer_join_handle);
420
421        meta_client.activate(&frontend_address).await?;
422
423        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
424        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
425        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
426
427        if config.server.metrics_level > MetricLevel::Disabled {
428            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
429        }
430
431        let health_srv = HealthServiceImpl::new();
432        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
433        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
434        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
435
436        let telemetry_manager = TelemetryManager::new(
437            Arc::new(meta_client.clone()),
438            Arc::new(FrontendTelemetryCreator::new()),
439        );
440
441        // if the toml config file or env variable disables telemetry, do not watch system params
442        // change because if any of configs disable telemetry, we should never start it
443        if config.server.telemetry_enabled && telemetry_env_enabled() {
444            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
445            join_handles.push(join_handle);
446            shutdown_senders.push(shutdown_sender);
447        } else {
448            tracing::info!("Telemetry didn't start due to config");
449        }
450
451        tokio::spawn(async move {
452            tonic::transport::Server::builder()
453                .add_service(HealthServer::new(health_srv))
454                .add_service(FrontendServiceServer::new(frontend_srv))
455                .add_service(MonitorServiceServer::new(monitor_srv))
456                .serve(frontend_rpc_addr)
457                .await
458                .unwrap();
459        });
460        info!(
461            "Health Check RPC Listener is set up on {}",
462            opts.frontend_rpc_listener_addr.clone()
463        );
464
465        let creating_streaming_job_tracker =
466            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
467
468        let runtime = {
469            let mut builder = Builder::new_multi_thread();
470            if let Some(frontend_compute_runtime_worker_threads) =
471                config.batch.frontend_compute_runtime_worker_threads
472            {
473                builder.worker_threads(frontend_compute_runtime_worker_threads);
474            }
475            builder
476                .thread_name("rw-batch-local")
477                .enable_all()
478                .build()
479                .unwrap()
480        };
481        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
482
483        let sessions = sessions_map.clone();
484        // Idle transaction background monitor
485        let join_handle = tokio::spawn(async move {
486            let mut check_idle_txn_interval =
487                tokio::time::interval(core::time::Duration::from_secs(10));
488            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
489            check_idle_txn_interval.reset();
490            loop {
491                check_idle_txn_interval.tick().await;
492                sessions.read().values().for_each(|session| {
493                    let _ = session.check_idle_in_transaction_timeout();
494                })
495            }
496        });
497        join_handles.push(join_handle);
498
499        // Clean up the spill directory.
500        #[cfg(not(madsim))]
501        if config.batch.enable_spill {
502            SpillOp::clean_spill_directory()
503                .await
504                .map_err(|err| anyhow!(err))?;
505        }
506
507        let total_memory_bytes = opts.frontend_total_memory_bytes;
508        let heap_profiler =
509            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
510        // Run a background heap profiler
511        heap_profiler.start();
512
513        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
514        let mem_context = MemoryContext::root(
515            frontend_metrics.batch_total_mem.clone(),
516            batch_memory_limit as u64,
517        );
518        #[cfg(feature = "datafusion")]
519        let df_spillable_budget_ctx =
520            crate::datafusion::create_df_spillable_budget_ctx(&mem_context);
521
522        // Initialize Prometheus client if endpoint is provided
523        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
524            match PrometheusClient::try_from(endpoint.as_str()) {
525                Ok(client) => {
526                    info!("Prometheus client initialized with endpoint: {}", endpoint);
527                    Some(client)
528                }
529                Err(e) => {
530                    error!(
531                        error = %e.as_report(),
532                        "failed to initialize Prometheus client",
533                    );
534                    None
535                }
536            }
537        } else {
538            None
539        };
540
541        // Initialize Prometheus selector
542        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
543
544        info!(
545            "Frontend  total_memory: {} batch_memory: {}",
546            convert(total_memory_bytes as _),
547            convert(batch_memory_limit as _),
548        );
549
550        Ok((
551            Self {
552                catalog_reader,
553                catalog_writer,
554                user_info_reader,
555                user_info_writer,
556                worker_node_manager,
557                meta_client: frontend_meta_client,
558                query_manager,
559                hummock_snapshot_manager,
560                system_params_manager,
561                session_params,
562                server_addr: frontend_address,
563                client_pool: compute_client_pool,
564                frontend_client_pool,
565                monitor_client_pool,
566                frontend_metrics,
567                cursor_metrics,
568                spill_metrics,
569                sessions_map,
570                batch_config: config.batch,
571                frontend_config: config.frontend,
572                meta_config: config.meta,
573                streaming_config: config.streaming,
574                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
575                udf_config: config.udf,
576                source_metrics,
577                creating_streaming_job_tracker,
578                compute_runtime,
579                mem_context,
580                #[cfg(feature = "datafusion")]
581                df_spillable_budget_ctx,
582                prometheus_client,
583                prometheus_selector,
584            },
585            join_handles,
586            shutdown_senders,
587        ))
588    }
589
590    /// Get a reference to the frontend env's catalog writer.
591    ///
592    /// This method is intentionally private, and a write guard is required for the caller to
593    /// prove that the write operations are permitted in the current transaction.
594    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
595        &*self.catalog_writer
596    }
597
598    /// Get a reference to the frontend env's catalog reader.
599    pub fn catalog_reader(&self) -> &CatalogReader {
600        &self.catalog_reader
601    }
602
603    /// Get a reference to the frontend env's user info writer.
604    ///
605    /// This method is intentionally private, and a write guard is required for the caller to
606    /// prove that the write operations are permitted in the current transaction.
607    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
608        &*self.user_info_writer
609    }
610
611    /// Get a reference to the frontend env's user info reader.
612    pub fn user_info_reader(&self) -> &UserInfoReader {
613        &self.user_info_reader
614    }
615
616    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
617        self.worker_node_manager.clone()
618    }
619
620    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
621        &*self.meta_client
622    }
623
624    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
625        self.meta_client.clone()
626    }
627
628    pub fn query_manager(&self) -> &QueryManager {
629        &self.query_manager
630    }
631
632    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
633        &self.hummock_snapshot_manager
634    }
635
636    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
637        &self.system_params_manager
638    }
639
640    pub fn session_params_snapshot(&self) -> SessionConfig {
641        self.session_params.read_recursive().clone()
642    }
643
644    pub fn sbc_address(&self) -> &String {
645        &self.serverless_backfill_controller_addr
646    }
647
648    /// Get a reference to the Prometheus client if available.
649    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
650        self.prometheus_client.as_ref()
651    }
652
653    /// Get the Prometheus selector string.
654    pub fn prometheus_selector(&self) -> &str {
655        &self.prometheus_selector
656    }
657
658    pub fn server_address(&self) -> &HostAddr {
659        &self.server_addr
660    }
661
662    pub fn client_pool(&self) -> ComputeClientPoolRef {
663        self.client_pool.clone()
664    }
665
666    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
667        self.frontend_client_pool.clone()
668    }
669
670    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
671        self.monitor_client_pool.clone()
672    }
673
674    pub fn batch_config(&self) -> &BatchConfig {
675        &self.batch_config
676    }
677
678    pub fn frontend_config(&self) -> &FrontendConfig {
679        &self.frontend_config
680    }
681
682    pub fn streaming_config(&self) -> &StreamingConfig {
683        &self.streaming_config
684    }
685
686    pub fn udf_config(&self) -> &UdfConfig {
687        &self.udf_config
688    }
689
690    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
691        self.source_metrics.clone()
692    }
693
694    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
695        self.spill_metrics.clone()
696    }
697
698    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
699        &self.creating_streaming_job_tracker
700    }
701
702    pub fn sessions_map(&self) -> &SessionMapRef {
703        &self.sessions_map
704    }
705
706    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
707        self.compute_runtime.clone()
708    }
709
710    /// Cancel queries (i.e. batch queries) in session.
711    /// If the session exists return true, otherwise, return false.
712    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
713        cancel_queries_in_session(session_id, self.sessions_map.clone())
714    }
715
716    /// Cancel creating jobs (i.e. streaming queries) in session.
717    /// If the session exists return true, otherwise, return false.
718    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
719        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
720    }
721
722    pub fn mem_context(&self) -> MemoryContext {
723        self.mem_context.clone()
724    }
725
726    #[cfg(feature = "datafusion")]
727    pub fn df_spillable_budget_ctx(&self) -> MemoryContext {
728        self.df_spillable_budget_ctx.clone()
729    }
730}
731
732#[derive(Clone)]
733pub struct AuthContext {
734    pub database: String,
735    pub user_name: String,
736    pub user_id: UserId,
737}
738
739impl AuthContext {
740    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
741        Self {
742            database,
743            user_name,
744            user_id,
745        }
746    }
747}
748pub struct SessionImpl {
749    env: FrontendEnv,
750    auth_context: Arc<RwLock<AuthContext>>,
751    /// Used for user authentication.
752    user_authenticator: UserAuthenticator,
753    /// Stores the value of configurations.
754    config_map: Arc<RwLock<SessionConfig>>,
755
756    /// Channel sender for frontend handler to send notices.
757    notice_tx: UnboundedSender<String>,
758    /// Channel receiver for pgwire to take notices and send to clients.
759    notice_rx: Mutex<UnboundedReceiver<String>>,
760
761    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
762    id: (i32, i32),
763
764    /// Client address
765    peer_addr: AddressRef,
766
767    /// Transaction state.
768    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
769    /// async functions, there should actually be no contention.
770    txn: Arc<Mutex<transaction::State>>,
771
772    /// Query cancel flag.
773    /// This flag is set only when current query is executed in local mode, and used to cancel
774    /// local query.
775    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
776
777    /// execution context represents the lifetime of a running SQL in the current session
778    exec_context: Mutex<Option<Weak<ExecContext>>>,
779
780    /// Last idle instant
781    last_idle_instant: Arc<Mutex<Option<Instant>>>,
782
783    cursor_manager: Arc<CursorManager>,
784
785    /// temporary sources for the current session
786    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
787
788    /// staging catalogs for the current session
789    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
790}
791
792/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
793/// Temporary sources are automatically dropped at the end of a session
794/// Temporary sources are expected to be selected by batch queries, not streaming queries.
795/// Temporary sources currently are only used by cloud portal to preview the data during table and
796/// source creation, so it is a internal feature and not exposed to users.
797/// The current PR supports temporary source with minimum effort,
798/// so we don't care about the database name and schema name, but only care about the source name.
799/// Temporary sources can only be shown via `show sources` command but not other system tables.
800#[derive(Default, Clone)]
801pub struct TemporarySourceManager {
802    sources: HashMap<String, SourceCatalog>,
803}
804
805impl TemporarySourceManager {
806    pub fn new() -> Self {
807        Self {
808            sources: HashMap::new(),
809        }
810    }
811
812    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
813        self.sources.insert(name, source);
814    }
815
816    pub fn drop_source(&mut self, name: &str) {
817        self.sources.remove(name);
818    }
819
820    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
821        self.sources.get(name)
822    }
823
824    pub fn keys(&self) -> Vec<String> {
825        self.sources.keys().cloned().collect()
826    }
827}
828
829/// Staging catalog manager is used to manage the tables creating in the current session.
830#[derive(Default, Clone)]
831pub struct StagingCatalogManager {
832    // staging tables creating in the current session.
833    tables: HashMap<String, TableCatalog>,
834}
835
836impl StagingCatalogManager {
837    pub fn new() -> Self {
838        Self {
839            tables: HashMap::new(),
840        }
841    }
842
843    pub fn create_table(&mut self, name: String, table: TableCatalog) {
844        self.tables.insert(name, table);
845    }
846
847    pub fn drop_table(&mut self, name: &str) {
848        self.tables.remove(name);
849    }
850
851    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
852        self.tables.get(name)
853    }
854}
855
856#[derive(Error, Debug)]
857pub enum CheckRelationError {
858    #[error("{0}")]
859    Resolve(#[from] ResolveQualifiedNameError),
860    #[error("{0}")]
861    Catalog(#[from] CatalogError),
862}
863
864impl From<CheckRelationError> for RwError {
865    fn from(e: CheckRelationError) -> Self {
866        match e {
867            CheckRelationError::Resolve(e) => e.into(),
868            CheckRelationError::Catalog(e) => e.into(),
869        }
870    }
871}
872
873impl SessionImpl {
874    pub(crate) fn new(
875        env: FrontendEnv,
876        auth_context: AuthContext,
877        user_authenticator: UserAuthenticator,
878        id: SessionId,
879        peer_addr: AddressRef,
880        session_config: SessionConfig,
881    ) -> Self {
882        let cursor_metrics = env.cursor_metrics.clone();
883        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
884
885        Self {
886            env,
887            auth_context: Arc::new(RwLock::new(auth_context)),
888            user_authenticator,
889            config_map: Arc::new(RwLock::new(session_config)),
890            id,
891            peer_addr,
892            txn: Default::default(),
893            current_query_cancel_flag: Mutex::new(None),
894            notice_tx,
895            notice_rx: Mutex::new(notice_rx),
896            exec_context: Mutex::new(None),
897            last_idle_instant: Default::default(),
898            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
899            temporary_source_manager: Default::default(),
900            staging_catalog_manager: Default::default(),
901        }
902    }
903
904    #[cfg(test)]
905    pub fn mock() -> Self {
906        let env = FrontendEnv::mock();
907        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
908
909        Self {
910            env: FrontendEnv::mock(),
911            auth_context: Arc::new(RwLock::new(AuthContext::new(
912                DEFAULT_DATABASE_NAME.to_owned(),
913                DEFAULT_SUPER_USER.to_owned(),
914                DEFAULT_SUPER_USER_ID,
915            ))),
916            user_authenticator: UserAuthenticator::None,
917            config_map: Default::default(),
918            // Mock session use non-sense id.
919            id: (0, 0),
920            txn: Default::default(),
921            current_query_cancel_flag: Mutex::new(None),
922            notice_tx,
923            notice_rx: Mutex::new(notice_rx),
924            exec_context: Mutex::new(None),
925            peer_addr: Address::Tcp(SocketAddr::new(
926                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
927                8080,
928            ))
929            .into(),
930            last_idle_instant: Default::default(),
931            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
932            temporary_source_manager: Default::default(),
933            staging_catalog_manager: Default::default(),
934        }
935    }
936
937    pub(crate) fn env(&self) -> &FrontendEnv {
938        &self.env
939    }
940
941    pub fn auth_context(&self) -> Arc<AuthContext> {
942        let ctx = self.auth_context.read();
943        Arc::new(ctx.clone())
944    }
945
946    pub fn database(&self) -> String {
947        self.auth_context.read().database.clone()
948    }
949
950    pub fn database_id(&self) -> DatabaseId {
951        let db_name = self.database();
952        self.env
953            .catalog_reader()
954            .read_guard()
955            .get_database_by_name(&db_name)
956            .map(|db| db.id())
957            .expect("session database not found")
958    }
959
960    pub fn user_name(&self) -> String {
961        self.auth_context.read().user_name.clone()
962    }
963
964    pub fn user_id(&self) -> UserId {
965        self.auth_context.read().user_id
966    }
967
968    pub fn update_database(&self, database: String) {
969        self.auth_context.write().database = database;
970    }
971
972    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
973        Arc::clone(&self.config_map)
974    }
975
976    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
977        self.config_map.read()
978    }
979
980    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
981        self.config_map
982            .write()
983            .set(key, value, &mut ())
984            .map_err(Into::into)
985    }
986
987    pub fn reset_config(&self, key: &str) -> Result<String> {
988        self.config_map
989            .write()
990            .reset(key, &mut ())
991            .map_err(Into::into)
992    }
993
994    pub fn set_config_report(
995        &self,
996        key: &str,
997        value: Option<String>,
998        mut reporter: impl ConfigReporter,
999    ) -> Result<String> {
1000        if let Some(value) = value {
1001            self.config_map
1002                .write()
1003                .set(key, value, &mut reporter)
1004                .map_err(Into::into)
1005        } else {
1006            self.config_map
1007                .write()
1008                .reset(key, &mut reporter)
1009                .map_err(Into::into)
1010        }
1011    }
1012
1013    pub fn session_id(&self) -> SessionId {
1014        self.id
1015    }
1016
1017    pub fn running_sql(&self) -> Option<Arc<str>> {
1018        self.exec_context
1019            .lock()
1020            .as_ref()
1021            .and_then(|weak| weak.upgrade())
1022            .map(|context| context.running_sql.clone())
1023    }
1024
1025    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1026        self.cursor_manager.clone()
1027    }
1028
1029    pub fn peer_addr(&self) -> &Address {
1030        &self.peer_addr
1031    }
1032
1033    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1034        self.exec_context
1035            .lock()
1036            .as_ref()
1037            .and_then(|weak| weak.upgrade())
1038            .map(|context| context.last_instant.elapsed().as_millis())
1039    }
1040
1041    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1042        self.last_idle_instant
1043            .lock()
1044            .as_ref()
1045            .map(|x| x.elapsed().as_millis())
1046    }
1047
1048    pub fn check_relation_name_duplicated(
1049        &self,
1050        name: ObjectName,
1051        stmt_type: StatementType,
1052        if_not_exists: bool,
1053    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1054        let db_name = &self.database();
1055        let catalog_reader = self.env().catalog_reader().read_guard();
1056        let (schema_name, relation_name) = {
1057            let (schema_name, relation_name) =
1058                Binder::resolve_schema_qualified_name(db_name, &name)?;
1059            let search_path = self.config().search_path();
1060            let user_name = &self.user_name();
1061            let schema_name = match schema_name {
1062                Some(schema_name) => schema_name,
1063                None => catalog_reader
1064                    .first_valid_schema(db_name, &search_path, user_name)?
1065                    .name(),
1066            };
1067            (schema_name, relation_name)
1068        };
1069        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1070            Err(e) if if_not_exists => {
1071                if let CatalogErrorInner::Duplicated {
1072                    name,
1073                    under_creation,
1074                    ..
1075                } = e.inner()
1076                {
1077                    // If relation is created, return directly.
1078                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1079                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1080                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1081                    if !*under_creation {
1082                        Ok(Either::Right(
1083                            PgResponse::builder(stmt_type)
1084                                .notice(format!("relation \"{}\" already exists, skipping", name))
1085                                .into(),
1086                        ))
1087                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1088                        // For now, when a Subscription is creating, we return directly with an additional message.
1089                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1090                        Ok(Either::Right(
1091                            PgResponse::builder(stmt_type)
1092                                .notice(format!(
1093                                    "relation \"{}\" already exists but still creating, skipping",
1094                                    name
1095                                ))
1096                                .into(),
1097                        ))
1098                    } else {
1099                        Ok(Either::Left(()))
1100                    }
1101                } else {
1102                    Err(e.into())
1103                }
1104            }
1105            Err(e) => Err(e.into()),
1106            Ok(_) => Ok(Either::Left(())),
1107        }
1108    }
1109
1110    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1111        let db_name = &self.database();
1112        let catalog_reader = self.env().catalog_reader().read_guard();
1113        let (schema_name, secret_name) = {
1114            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1115            let search_path = self.config().search_path();
1116            let user_name = &self.user_name();
1117            let schema_name = match schema_name {
1118                Some(schema_name) => schema_name,
1119                None => catalog_reader
1120                    .first_valid_schema(db_name, &search_path, user_name)?
1121                    .name(),
1122            };
1123            (schema_name, secret_name)
1124        };
1125        catalog_reader
1126            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1127            .map_err(RwError::from)
1128    }
1129
1130    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1131        let db_name = &self.database();
1132        let catalog_reader = self.env().catalog_reader().read_guard();
1133        let (schema_name, connection_name) = {
1134            let (schema_name, connection_name) =
1135                Binder::resolve_schema_qualified_name(db_name, &name)?;
1136            let search_path = self.config().search_path();
1137            let user_name = &self.user_name();
1138            let schema_name = match schema_name {
1139                Some(schema_name) => schema_name,
1140                None => catalog_reader
1141                    .first_valid_schema(db_name, &search_path, user_name)?
1142                    .name(),
1143            };
1144            (schema_name, connection_name)
1145        };
1146        catalog_reader
1147            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1148            .map_err(RwError::from)
1149    }
1150
1151    pub fn check_function_name_duplicated(
1152        &self,
1153        stmt_type: StatementType,
1154        name: ObjectName,
1155        arg_types: &[DataType],
1156        if_not_exists: bool,
1157    ) -> Result<Either<(), RwPgResponse>> {
1158        let db_name = &self.database();
1159        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1160        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1161
1162        let catalog_reader = self.env().catalog_reader().read_guard();
1163        if catalog_reader
1164            .get_schema_by_id(database_id, schema_id)?
1165            .get_function_by_name_args(&function_name, arg_types)
1166            .is_some()
1167        {
1168            let full_name = format!(
1169                "{function_name}({})",
1170                arg_types.iter().map(|t| t.to_string()).join(",")
1171            );
1172            if if_not_exists {
1173                Ok(Either::Right(
1174                    PgResponse::builder(stmt_type)
1175                        .notice(format!(
1176                            "function \"{}\" already exists, skipping",
1177                            full_name
1178                        ))
1179                        .into(),
1180                ))
1181            } else {
1182                Err(CatalogError::duplicated("function", full_name).into())
1183            }
1184        } else {
1185            Ok(Either::Left(()))
1186        }
1187    }
1188
1189    /// Also check if the user has the privilege to create in the schema.
1190    pub fn get_database_and_schema_id_for_create(
1191        &self,
1192        schema_name: Option<String>,
1193    ) -> Result<(DatabaseId, SchemaId)> {
1194        let db_name = &self.database();
1195
1196        let search_path = self.config().search_path();
1197        let user_name = &self.user_name();
1198
1199        let catalog_reader = self.env().catalog_reader().read_guard();
1200        let schema = match schema_name {
1201            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1202            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1203        };
1204        let schema_name = schema.name();
1205
1206        check_schema_writable(&schema_name)?;
1207        self.check_privileges(&[ObjectCheckItem::new(
1208            schema.owner(),
1209            AclMode::Create,
1210            schema_name,
1211            schema.id(),
1212        )])?;
1213
1214        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1215        Ok((db_id, schema.id()))
1216    }
1217
1218    pub fn get_connection_by_name(
1219        &self,
1220        schema_name: Option<String>,
1221        connection_name: &str,
1222    ) -> Result<Arc<ConnectionCatalog>> {
1223        let db_name = &self.database();
1224        let search_path = self.config().search_path();
1225        let user_name = &self.user_name();
1226
1227        let catalog_reader = self.env().catalog_reader().read_guard();
1228        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1229        let (connection, _) =
1230            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1231
1232        self.check_privileges(&[ObjectCheckItem::new(
1233            connection.owner(),
1234            AclMode::Usage,
1235            connection.name.clone(),
1236            connection.id,
1237        )])?;
1238
1239        Ok(connection.clone())
1240    }
1241
1242    pub fn get_subscription_by_schema_id_name(
1243        &self,
1244        schema_id: SchemaId,
1245        subscription_name: &str,
1246    ) -> Result<Arc<SubscriptionCatalog>> {
1247        let db_name = &self.database();
1248
1249        let catalog_reader = self.env().catalog_reader().read_guard();
1250        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1251        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1252        let subscription = schema
1253            .get_subscription_by_name(subscription_name)
1254            .ok_or_else(|| {
1255                RwError::from(ErrorCode::ItemNotFound(format!(
1256                    "subscription {} not found",
1257                    subscription_name
1258                )))
1259            })?;
1260        Ok(subscription.clone())
1261    }
1262
1263    pub fn get_subscription_by_name(
1264        &self,
1265        schema_name: Option<String>,
1266        subscription_name: &str,
1267    ) -> Result<Arc<SubscriptionCatalog>> {
1268        let db_name = &self.database();
1269        let search_path = self.config().search_path();
1270        let user_name = &self.user_name();
1271
1272        let catalog_reader = self.env().catalog_reader().read_guard();
1273        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1274        let (subscription, _) =
1275            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1276        Ok(subscription.clone())
1277    }
1278
1279    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1280        let catalog_reader = self.env().catalog_reader().read_guard();
1281        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1282    }
1283
1284    pub fn get_table_by_name(
1285        &self,
1286        table_name: &str,
1287        db_id: DatabaseId,
1288        schema_id: SchemaId,
1289    ) -> Result<Arc<TableCatalog>> {
1290        let catalog_reader = self.env().catalog_reader().read_guard();
1291        let table = catalog_reader
1292            .get_schema_by_id(db_id, schema_id)?
1293            .get_created_table_by_name(table_name)
1294            .ok_or_else(|| {
1295                Error::new(
1296                    ErrorKind::InvalidInput,
1297                    format!("table \"{}\" does not exist", table_name),
1298                )
1299            })?;
1300
1301        self.check_privileges(&[ObjectCheckItem::new(
1302            table.owner(),
1303            AclMode::Select,
1304            table_name.to_owned(),
1305            table.id,
1306        )])?;
1307
1308        Ok(table.clone())
1309    }
1310
1311    pub fn get_secret_by_name(
1312        &self,
1313        schema_name: Option<String>,
1314        secret_name: &str,
1315    ) -> Result<Arc<SecretCatalog>> {
1316        let db_name = &self.database();
1317        let search_path = self.config().search_path();
1318        let user_name = &self.user_name();
1319
1320        let catalog_reader = self.env().catalog_reader().read_guard();
1321        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1322        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1323
1324        self.check_privileges(&[ObjectCheckItem::new(
1325            secret.owner(),
1326            AclMode::Usage,
1327            secret.name.clone(),
1328            secret.id,
1329        )])?;
1330
1331        Ok(secret.clone())
1332    }
1333
1334    pub fn list_change_log_epochs(
1335        &self,
1336        table_id: TableId,
1337        min_epoch: u64,
1338        max_count: u32,
1339    ) -> Result<Vec<u64>> {
1340        Ok(self
1341            .env
1342            .hummock_snapshot_manager()
1343            .acquire()
1344            .list_change_log_epochs(table_id, min_epoch, max_count))
1345    }
1346
1347    pub fn clear_cancel_query_flag(&self) {
1348        let mut flag = self.current_query_cancel_flag.lock();
1349        *flag = None;
1350    }
1351
1352    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1353        let mut flag = self.current_query_cancel_flag.lock();
1354        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1355        *flag = Some(shutdown_tx);
1356        shutdown_rx
1357    }
1358
1359    pub fn cancel_current_query(&self) {
1360        let mut flag_guard = self.current_query_cancel_flag.lock();
1361        if let Some(sender) = flag_guard.take() {
1362            info!("Trying to cancel query in local mode.");
1363            // Current running query is in local mode
1364            sender.cancel();
1365            info!("Cancel query request sent.");
1366        } else {
1367            info!("Trying to cancel query in distributed mode.");
1368            self.env.query_manager().cancel_queries_in_session(self.id)
1369        }
1370    }
1371
1372    pub fn cancel_current_creating_job(&self) {
1373        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1374    }
1375
1376    /// This function only used for test now.
1377    /// Maybe we can remove it in the future.
1378    pub async fn run_statement(
1379        self: Arc<Self>,
1380        sql: Arc<str>,
1381        formats: Vec<Format>,
1382    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1383        // Parse sql.
1384        let mut stmts = Parser::parse_sql(&sql)?;
1385        if stmts.is_empty() {
1386            return Ok(PgResponse::empty_result(
1387                pgwire::pg_response::StatementType::EMPTY,
1388            ));
1389        }
1390        if stmts.len() > 1 {
1391            return Ok(
1392                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1393                    .notice("cannot insert multiple commands into statement")
1394                    .into(),
1395            );
1396        }
1397        let stmt = stmts.swap_remove(0);
1398        let rsp = handle(self, stmt, sql.clone(), formats).await?;
1399        Ok(rsp)
1400    }
1401
1402    pub fn notice_to_user(&self, str: impl Into<String>) {
1403        let notice = str.into();
1404        tracing::trace!(notice, "notice to user");
1405        self.notice_tx
1406            .send(notice)
1407            .expect("notice channel should not be closed");
1408    }
1409
1410    pub fn is_barrier_read(&self) -> bool {
1411        match self.config().visibility_mode() {
1412            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1413            VisibilityMode::All => true,
1414            VisibilityMode::Checkpoint => false,
1415        }
1416    }
1417
1418    pub fn statement_timeout(&self) -> Duration {
1419        if self.config().statement_timeout().millis() == 0 {
1420            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1421        } else {
1422            Duration::from_millis(self.config().statement_timeout().millis() as u64)
1423        }
1424    }
1425
1426    pub fn create_temporary_source(&self, source: SourceCatalog) {
1427        self.temporary_source_manager
1428            .lock()
1429            .create_source(source.name.clone(), source);
1430    }
1431
1432    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1433        self.temporary_source_manager
1434            .lock()
1435            .get_source(name)
1436            .cloned()
1437    }
1438
1439    pub fn drop_temporary_source(&self, name: &str) {
1440        self.temporary_source_manager.lock().drop_source(name);
1441    }
1442
1443    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1444        self.temporary_source_manager.lock().clone()
1445    }
1446
1447    pub fn create_staging_table(&self, table: TableCatalog) {
1448        self.staging_catalog_manager
1449            .lock()
1450            .create_table(table.name.clone(), table);
1451    }
1452
1453    pub fn drop_staging_table(&self, name: &str) {
1454        self.staging_catalog_manager.lock().drop_table(name);
1455    }
1456
1457    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1458        self.staging_catalog_manager.lock().clone()
1459    }
1460
1461    pub async fn check_cluster_limits(&self) -> Result<()> {
1462        if self.config().bypass_cluster_limits() {
1463            return Ok(());
1464        }
1465
1466        let gen_message = |ActorCountPerParallelism {
1467                               worker_id_to_actor_count,
1468                               hard_limit,
1469                               soft_limit,
1470                           }: ActorCountPerParallelism,
1471                           exceed_hard_limit: bool|
1472         -> String {
1473            let (limit_type, action) = if exceed_hard_limit {
1474                ("critical", "Scale the cluster immediately to proceed.")
1475            } else {
1476                (
1477                    "recommended",
1478                    "Consider scaling the cluster for optimal performance.",
1479                )
1480            };
1481            format!(
1482                r#"Actor count per parallelism exceeds the {limit_type} limit.
1483
1484Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1485
1486HINT:
1487- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1488- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1489  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1490- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1491
1492DETAILS:
1493- hard limit: {hard_limit}
1494- soft limit: {soft_limit}
1495- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1496            )
1497        };
1498
1499        let limits = self.env().meta_client().get_cluster_limits().await?;
1500        for limit in limits {
1501            match limit {
1502                cluster_limit::ClusterLimit::ActorCount(l) => {
1503                    if l.exceed_hard_limit() {
1504                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1505                            l, true,
1506                        ))));
1507                    } else if l.exceed_soft_limit() {
1508                        self.notice_to_user(gen_message(l, false));
1509                    }
1510                }
1511            }
1512        }
1513        Ok(())
1514    }
1515}
1516
1517pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1518    std::sync::OnceLock::new();
1519
1520pub struct SessionManagerImpl {
1521    env: FrontendEnv,
1522    _join_handles: Vec<JoinHandle<()>>,
1523    _shutdown_senders: Vec<Sender<()>>,
1524    number: AtomicI32,
1525}
1526
1527impl SessionManager for SessionManagerImpl {
1528    type Error = RwError;
1529    type Session = SessionImpl;
1530
1531    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1532        let dummy_addr = Address::Tcp(SocketAddr::new(
1533            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1534            5691, // port of meta
1535        ));
1536
1537        // Always use the built-in super user for dummy sessions.
1538        // This avoids permission checks tied to a specific user_id.
1539        self.connect_inner(
1540            database_id,
1541            risingwave_common::catalog::DEFAULT_SUPER_USER,
1542            Arc::new(dummy_addr),
1543        )
1544    }
1545
1546    fn connect(
1547        &self,
1548        database: &str,
1549        user_name: &str,
1550        peer_addr: AddressRef,
1551    ) -> Result<Arc<Self::Session>> {
1552        let catalog_reader = self.env.catalog_reader();
1553        let reader = catalog_reader.read_guard();
1554        let database_id = reader.get_database_by_name(database)?.id();
1555
1556        self.connect_inner(database_id, user_name, peer_addr)
1557    }
1558
1559    /// Used when cancel request happened.
1560    fn cancel_queries_in_session(&self, session_id: SessionId) {
1561        self.env.cancel_queries_in_session(session_id);
1562    }
1563
1564    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1565        self.env.cancel_creating_jobs_in_session(session_id);
1566    }
1567
1568    fn end_session(&self, session: &Self::Session) {
1569        self.delete_session(&session.session_id());
1570    }
1571
1572    async fn shutdown(&self) {
1573        // Clean up the session map.
1574        self.env.sessions_map().write().clear();
1575        // Unregister from the meta service.
1576        self.env.meta_client().try_unregister().await;
1577    }
1578}
1579
1580impl SessionManagerImpl {
1581    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1582        // TODO(shutdown): only save join handles that **need** to be shutdown
1583        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1584        Ok(Self {
1585            env,
1586            _join_handles: join_handles,
1587            _shutdown_senders: shutdown_senders,
1588            number: AtomicI32::new(0),
1589        })
1590    }
1591
1592    pub(crate) fn env(&self) -> &FrontendEnv {
1593        &self.env
1594    }
1595
1596    fn insert_session(&self, session: Arc<SessionImpl>) {
1597        let active_sessions = {
1598            let mut write_guard = self.env.sessions_map.write();
1599            write_guard.insert(session.id(), session);
1600            write_guard.len()
1601        };
1602        self.env
1603            .frontend_metrics
1604            .active_sessions
1605            .set(active_sessions as i64);
1606    }
1607
1608    fn delete_session(&self, session_id: &SessionId) {
1609        let active_sessions = {
1610            let mut write_guard = self.env.sessions_map.write();
1611            write_guard.remove(session_id);
1612            write_guard.len()
1613        };
1614        self.env
1615            .frontend_metrics
1616            .active_sessions
1617            .set(active_sessions as i64);
1618    }
1619
1620    fn connect_inner(
1621        &self,
1622        database_id: DatabaseId,
1623        user_name: &str,
1624        peer_addr: AddressRef,
1625    ) -> Result<Arc<SessionImpl>> {
1626        let catalog_reader = self.env.catalog_reader();
1627        let reader = catalog_reader.read_guard();
1628        let (database_name, database_owner) = {
1629            let db = reader.get_database_by_id(database_id)?;
1630            (db.name(), db.owner())
1631        };
1632
1633        let user_reader = self.env.user_info_reader();
1634        let reader = user_reader.read_guard();
1635        if let Some(user) = reader.get_user_by_name(user_name) {
1636            if !user.can_login {
1637                bail_permission_denied!("User {} is not allowed to login", user_name);
1638            }
1639            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1640            if !user.is_super && database_owner != user.id && !has_privilege {
1641                bail_permission_denied!("User does not have CONNECT privilege.");
1642            }
1643
1644            // Check HBA configuration for LDAP authentication
1645            let (connection_type, client_addr) = match peer_addr.as_ref() {
1646                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1647                Address::Unix(_) => (ConnectionType::Local, None),
1648            };
1649            tracing::debug!(
1650                "receive connection: type={:?}, client_addr={:?}",
1651                connection_type,
1652                client_addr
1653            );
1654
1655            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1656                &connection_type,
1657                database_name,
1658                user_name,
1659                client_addr,
1660            );
1661
1662            let Some(hba_entry_opt) = hba_entry_opt else {
1663                bail_permission_denied!(
1664                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1665                );
1666            };
1667
1668            // Determine the user authenticator based on the user's auth info.
1669            let authenticator_by_info = || -> Result<UserAuthenticator> {
1670                let authenticator = match &user.auth_info {
1671                    None => UserAuthenticator::None,
1672                    Some(auth_info) => match auth_info.encryption_type() {
1673                        EncryptionType::Plaintext => {
1674                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1675                        }
1676                        EncryptionType::Md5 => {
1677                            let mut salt = [0; 4];
1678                            let mut rng = rand::rng();
1679                            rng.fill_bytes(&mut salt);
1680                            UserAuthenticator::Md5WithSalt {
1681                                encrypted_password: md5_hash_with_salt(
1682                                    &auth_info.encrypted_value,
1683                                    &salt,
1684                                ),
1685                                salt,
1686                            }
1687                        }
1688                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1689                            metadata: auth_info.metadata.clone(),
1690                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1691                        },
1692                        _ => {
1693                            bail_protocol_error!(
1694                                "Unsupported auth type: {}",
1695                                auth_info.encryption_type().as_str_name()
1696                            );
1697                        }
1698                    },
1699                };
1700                Ok(authenticator)
1701            };
1702
1703            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1704                (AuthMethod::Trust, _) => UserAuthenticator::None,
1705                // For backward compatibility, we allow password auth method to work with any auth info.
1706                (AuthMethod::Password, _) => authenticator_by_info()?,
1707                (AuthMethod::Md5, Some(auth_info))
1708                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1709                {
1710                    authenticator_by_info()?
1711                }
1712                (AuthMethod::OAuth, Some(auth_info))
1713                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1714                {
1715                    authenticator_by_info()?
1716                }
1717                (AuthMethod::Ldap, _) => {
1718                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1719                }
1720                _ => {
1721                    bail_permission_denied!(
1722                        "password authentication failed for user \"{user_name}\""
1723                    );
1724                }
1725            };
1726
1727            // Assign a session id and insert into sessions map (for cancel request).
1728            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1729            // Use a trivial strategy: process_id and secret_key are equal.
1730            let id = (secret_key, secret_key);
1731            // Read session params snapshot from frontend env.
1732            let session_config = self.env.session_params_snapshot();
1733
1734            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1735                self.env.clone(),
1736                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1737                user_authenticator,
1738                id,
1739                peer_addr,
1740                session_config,
1741            )
1742            .into();
1743            self.insert_session(session_impl.clone());
1744
1745            Ok(session_impl)
1746        } else {
1747            bail_catalog_error!("Role {} does not exist", user_name);
1748        }
1749    }
1750}
1751
1752impl Session for SessionImpl {
1753    type Error = RwError;
1754    type Portal = Portal;
1755    type PreparedStatement = PrepareStatement;
1756    type ValuesStream = PgResponseStream;
1757
1758    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1759    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1760    async fn run_one_query(
1761        self: Arc<Self>,
1762        stmt: Statement,
1763        format: Format,
1764    ) -> Result<PgResponse<PgResponseStream>> {
1765        let string = stmt.to_string();
1766        let sql_str = string.as_str();
1767        let sql: Arc<str> = Arc::from(sql_str);
1768        // The handle can be slow. Release potential large String early.
1769        drop(string);
1770        let rsp = handle(self, stmt, sql, vec![format]).await?;
1771        Ok(rsp)
1772    }
1773
1774    fn user_authenticator(&self) -> &UserAuthenticator {
1775        &self.user_authenticator
1776    }
1777
1778    fn id(&self) -> SessionId {
1779        self.id
1780    }
1781
1782    async fn parse(
1783        self: Arc<Self>,
1784        statement: Option<Statement>,
1785        params_types: Vec<Option<DataType>>,
1786    ) -> Result<PrepareStatement> {
1787        Ok(if let Some(statement) = statement {
1788            handle_parse(self, statement, params_types).await?
1789        } else {
1790            PrepareStatement::Empty
1791        })
1792    }
1793
1794    fn bind(
1795        self: Arc<Self>,
1796        prepare_statement: PrepareStatement,
1797        params: Vec<Option<Bytes>>,
1798        param_formats: Vec<Format>,
1799        result_formats: Vec<Format>,
1800    ) -> Result<Portal> {
1801        handle_bind(prepare_statement, params, param_formats, result_formats)
1802    }
1803
1804    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1805        let rsp = handle_execute(self, portal).await?;
1806        Ok(rsp)
1807    }
1808
1809    fn describe_statement(
1810        self: Arc<Self>,
1811        prepare_statement: PrepareStatement,
1812    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1813        Ok(match prepare_statement {
1814            PrepareStatement::Empty => (vec![], vec![]),
1815            PrepareStatement::Prepared(prepare_statement) => (
1816                prepare_statement.bound_result.param_types,
1817                infer(
1818                    Some(prepare_statement.bound_result.bound),
1819                    prepare_statement.statement,
1820                )?,
1821            ),
1822            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1823        })
1824    }
1825
1826    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1827        match portal {
1828            Portal::Empty => Ok(vec![]),
1829            Portal::Portal(portal) => {
1830                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1831                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1832                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1833                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1834                    if f == Format::Binary {
1835                        c.set_to_binary()
1836                    }
1837                });
1838                Ok(columns)
1839            }
1840            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1841        }
1842    }
1843
1844    fn get_config(&self, key: &str) -> Result<String> {
1845        self.config().get(key).map_err(Into::into)
1846    }
1847
1848    fn set_config(&self, key: &str, value: String) -> Result<String> {
1849        Self::set_config(self, key, value)
1850    }
1851
1852    async fn next_notice(self: &Arc<Self>) -> String {
1853        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1854            .await
1855            .expect("notice channel should not be closed")
1856    }
1857
1858    fn transaction_status(&self) -> TransactionStatus {
1859        match &*self.txn.lock() {
1860            transaction::State::Initial | transaction::State::Implicit(_) => {
1861                TransactionStatus::Idle
1862            }
1863            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1864            // TODO: failed transaction
1865        }
1866    }
1867
1868    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1869    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1870        let exec_context = Arc::new(ExecContext {
1871            running_sql: sql,
1872            last_instant: Instant::now(),
1873            last_idle_instant: self.last_idle_instant.clone(),
1874        });
1875        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1876        // unset idle state, since there is a sql running
1877        *self.last_idle_instant.lock() = None;
1878        ExecContextGuard::new(exec_context)
1879    }
1880
1881    /// Check whether idle transaction timeout.
1882    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1883    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1884        // In transaction.
1885        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1886            let idle_in_transaction_session_timeout =
1887                self.config().idle_in_transaction_session_timeout() as u128;
1888            // Idle transaction timeout has been enabled.
1889            if idle_in_transaction_session_timeout != 0 {
1890                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1891                let guard = self.exec_context.lock();
1892                // No running sql i.e. idle
1893                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1894                    // Idle timeout.
1895                    if let Some(elapse_since_last_idle_instant) =
1896                        self.elapse_since_last_idle_instant()
1897                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1898                    {
1899                        return Err(PsqlError::IdleInTxnTimeout);
1900                    }
1901                }
1902            }
1903        }
1904        Ok(())
1905    }
1906}
1907
1908/// Returns row description of the statement
1909fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1910    match stmt {
1911        Statement::Query(_)
1912        | Statement::Insert { .. }
1913        | Statement::Delete { .. }
1914        | Statement::Update { .. }
1915        | Statement::FetchCursor { .. } => Ok(bound
1916            .unwrap()
1917            .output_fields()
1918            .iter()
1919            .map(to_pg_field)
1920            .collect()),
1921        Statement::ShowObjects {
1922            object: show_object,
1923            ..
1924        } => Ok(infer_show_object(&show_object)),
1925        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1926        Statement::ShowTransactionIsolationLevel => {
1927            let name = "transaction_isolation";
1928            Ok(infer_show_variable(name))
1929        }
1930        Statement::ShowVariable { variable } => {
1931            let name = &variable[0].real_value().to_lowercase();
1932            Ok(infer_show_variable(name))
1933        }
1934        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1935        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1936            "QUERY PLAN".to_owned(),
1937            DataType::Varchar.to_oid(),
1938            DataType::Varchar.type_len(),
1939        )]),
1940        _ => Ok(vec![]),
1941    }
1942}
1943
1944pub struct WorkerProcessId {
1945    pub worker_id: WorkerId,
1946    pub process_id: i32,
1947}
1948
1949impl WorkerProcessId {
1950    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1951        Self {
1952            worker_id,
1953            process_id,
1954        }
1955    }
1956}
1957
1958impl Display for WorkerProcessId {
1959    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1960        write!(f, "{}:{}", self.worker_id, self.process_id)
1961    }
1962}
1963
1964impl TryFrom<String> for WorkerProcessId {
1965    type Error = String;
1966
1967    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1968        const INVALID: &str = "invalid WorkerProcessId";
1969        let splits: Vec<&str> = worker_process_id.split(":").collect();
1970        if splits.len() != 2 {
1971            return Err(INVALID.to_owned());
1972        }
1973        let Ok(worker_id) = splits[0].parse::<u32>() else {
1974            return Err(INVALID.to_owned());
1975        };
1976        let Ok(process_id) = splits[1].parse::<i32>() else {
1977            return Err(INVALID.to_owned());
1978        };
1979        Ok(WorkerProcessId::new(worker_id.into(), process_id))
1980    }
1981}
1982
1983pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1984    let guard = sessions_map.read();
1985    if let Some(session) = guard.get(&session_id) {
1986        session.cancel_current_query();
1987        true
1988    } else {
1989        info!("Current session finished, ignoring cancel query request");
1990        false
1991    }
1992}
1993
1994pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1995    let guard = sessions_map.read();
1996    if let Some(session) = guard.get(&session_id) {
1997        session.cancel_current_creating_job();
1998        true
1999    } else {
2000        info!("Current session finished, ignoring cancel creating request");
2001        false
2002    }
2003}