risingwave_frontend/
session.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35    UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44    WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53    StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60    LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::user::auth_info::EncryptionType;
80use risingwave_rpc_client::{
81    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
82};
83use risingwave_sqlparser::ast::{ObjectName, Statement};
84use risingwave_sqlparser::parser::Parser;
85use thiserror::Error;
86use thiserror_ext::AsReport;
87use tokio::runtime::Builder;
88use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
89use tokio::sync::oneshot::Sender;
90use tokio::sync::watch;
91use tokio::task::JoinHandle;
92use tracing::{error, info};
93
94use self::cursor_manager::CursorManager;
95use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
96use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
97use crate::catalog::connection_catalog::ConnectionCatalog;
98use crate::catalog::root_catalog::{Catalog, SchemaPath};
99use crate::catalog::secret_catalog::SecretCatalog;
100use crate::catalog::source_catalog::SourceCatalog;
101use crate::catalog::subscription_catalog::SubscriptionCatalog;
102use crate::catalog::{
103    CatalogError, DatabaseId, OwnedByUserCatalog, SchemaId, TableId, check_schema_writable,
104};
105use crate::error::{ErrorCode, Result, RwError};
106use crate::handler::describe::infer_describe;
107use crate::handler::extended_handle::{
108    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
109};
110use crate::handler::privilege::ObjectCheckItem;
111use crate::handler::show::{infer_show_create_object, infer_show_object};
112use crate::handler::util::to_pg_field;
113use crate::handler::variable::infer_show_variable;
114use crate::handler::{RwPgResponse, handle};
115use crate::health_service::HealthServiceImpl;
116use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
117use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
118use crate::observer::FrontendObserverNode;
119use crate::rpc::FrontendServiceImpl;
120use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
121use crate::scheduler::{
122    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
123    HummockSnapshotManagerRef, QueryManager,
124};
125use crate::telemetry::FrontendTelemetryCreator;
126use crate::user::UserId;
127use crate::user::user_authentication::md5_hash_with_salt;
128use crate::user::user_manager::UserInfoManager;
129use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
130use crate::{FrontendOpts, PgResponseStream, TableCatalog};
131
132pub(crate) mod current;
133pub(crate) mod cursor_manager;
134pub(crate) mod transaction;
135
136/// The global environment for the frontend server.
137#[derive(Clone)]
138pub(crate) struct FrontendEnv {
139    // Different session may access catalog at the same time and catalog is protected by a
140    // RwLock.
141    meta_client: Arc<dyn FrontendMetaClient>,
142    catalog_writer: Arc<dyn CatalogWriter>,
143    catalog_reader: CatalogReader,
144    user_info_writer: Arc<dyn UserInfoWriter>,
145    user_info_reader: UserInfoReader,
146    worker_node_manager: WorkerNodeManagerRef,
147    query_manager: QueryManager,
148    hummock_snapshot_manager: HummockSnapshotManagerRef,
149    system_params_manager: LocalSystemParamsManagerRef,
150    session_params: Arc<RwLock<SessionConfig>>,
151
152    server_addr: HostAddr,
153    client_pool: ComputeClientPoolRef,
154    frontend_client_pool: FrontendClientPoolRef,
155
156    /// Each session is identified by (`process_id`,
157    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
158    /// running queries.
159    sessions_map: SessionMapRef,
160
161    pub frontend_metrics: Arc<FrontendMetrics>,
162
163    pub cursor_metrics: Arc<CursorMetrics>,
164
165    source_metrics: Arc<SourceMetrics>,
166
167    /// Batch spill metrics
168    spill_metrics: Arc<BatchSpillMetrics>,
169
170    batch_config: BatchConfig,
171    frontend_config: FrontendConfig,
172    #[expect(dead_code)]
173    meta_config: MetaConfig,
174    streaming_config: StreamingConfig,
175    udf_config: UdfConfig,
176
177    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
178    /// received.
179    creating_streaming_job_tracker: StreamingJobTrackerRef,
180
181    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
182    /// root stage in mpp mode.
183    compute_runtime: Arc<BackgroundShutdownRuntime>,
184
185    /// Memory context used for batch executors in frontend.
186    mem_context: MemoryContext,
187
188    /// address of the serverless backfill controller.
189    serverless_backfill_controller_addr: String,
190
191    /// Prometheus client for querying metrics.
192    prometheus_client: Option<PrometheusClient>,
193
194    /// The additional selector used when querying Prometheus.
195    prometheus_selector: String,
196}
197
198/// Session map identified by `(process_id, secret_key)`
199pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
200
201/// The proportion of frontend memory used for batch processing.
202const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
203
204impl FrontendEnv {
205    pub fn mock() -> Self {
206        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
207
208        let catalog = Arc::new(RwLock::new(Catalog::default()));
209        let meta_client = Arc::new(MockFrontendMetaClient {});
210        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
211        let catalog_writer = Arc::new(MockCatalogWriter::new(
212            catalog.clone(),
213            hummock_snapshot_manager.clone(),
214        ));
215        let catalog_reader = CatalogReader::new(catalog);
216        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
217        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
218        let user_info_reader = UserInfoReader::new(user_info_manager);
219        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
220        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
221        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
222        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
223        let query_manager = QueryManager::new(
224            worker_node_manager.clone(),
225            compute_client_pool,
226            catalog_reader.clone(),
227            Arc::new(DistributedQueryMetrics::for_test()),
228            None,
229            None,
230        );
231        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
232        let client_pool = Arc::new(ComputeClientPool::for_test());
233        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
234        let runtime = {
235            let mut builder = Builder::new_multi_thread();
236            if let Some(frontend_compute_runtime_worker_threads) =
237                load_config("", FrontendOpts::default())
238                    .batch
239                    .frontend_compute_runtime_worker_threads
240            {
241                builder.worker_threads(frontend_compute_runtime_worker_threads);
242            }
243            builder
244                .thread_name("rw-batch-local")
245                .enable_all()
246                .build()
247                .unwrap()
248        };
249        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
250        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
251        Self {
252            meta_client,
253            catalog_writer,
254            catalog_reader,
255            user_info_writer,
256            user_info_reader,
257            worker_node_manager,
258            query_manager,
259            hummock_snapshot_manager,
260            system_params_manager,
261            session_params: Default::default(),
262            server_addr,
263            client_pool,
264            frontend_client_pool,
265            sessions_map,
266            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
267            cursor_metrics: Arc::new(CursorMetrics::for_test()),
268            batch_config: BatchConfig::default(),
269            frontend_config: FrontendConfig::default(),
270            meta_config: MetaConfig::default(),
271            streaming_config: StreamingConfig::default(),
272            udf_config: UdfConfig::default(),
273            source_metrics: Arc::new(SourceMetrics::default()),
274            spill_metrics: BatchSpillMetrics::for_test(),
275            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
276            compute_runtime,
277            mem_context: MemoryContext::none(),
278            serverless_backfill_controller_addr: Default::default(),
279            prometheus_client: None,
280            prometheus_selector: String::new(),
281        }
282    }
283
284    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
285        let config = load_config(&opts.config_path, &opts);
286        info!("Starting frontend node");
287        info!("> config: {:?}", config);
288        info!(
289            "> debug assertions: {}",
290            if cfg!(debug_assertions) { "on" } else { "off" }
291        );
292        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
293
294        let frontend_address: HostAddr = opts
295            .advertise_addr
296            .as_ref()
297            .unwrap_or_else(|| {
298                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
299                &opts.listen_addr
300            })
301            .parse()
302            .unwrap();
303        info!("advertise addr is {}", frontend_address);
304
305        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
306        let internal_rpc_host_addr = HostAddr {
307            // Use the host of advertise address for the frontend rpc address.
308            host: frontend_address.host.clone(),
309            port: rpc_addr.port,
310        };
311        // Register in meta by calling `AddWorkerNode` RPC.
312        let (meta_client, system_params_reader) = MetaClient::register_new(
313            opts.meta_addr,
314            WorkerType::Frontend,
315            &frontend_address,
316            AddWorkerNodeProperty {
317                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
318                ..Default::default()
319            },
320            Arc::new(config.meta.clone()),
321        )
322        .await;
323
324        let worker_id = meta_client.worker_id();
325        info!("Assigned worker node id {}", worker_id);
326
327        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
328            meta_client.clone(),
329            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
330        );
331        let mut join_handles = vec![heartbeat_join_handle];
332        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
333
334        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
335        let hummock_snapshot_manager =
336            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
337
338        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
339        let catalog = Arc::new(RwLock::new(Catalog::default()));
340        let catalog_writer = Arc::new(CatalogWriterImpl::new(
341            meta_client.clone(),
342            catalog_updated_rx.clone(),
343            hummock_snapshot_manager.clone(),
344        ));
345        let catalog_reader = CatalogReader::new(catalog.clone());
346
347        let worker_node_manager = Arc::new(WorkerNodeManager::new());
348
349        let compute_client_pool = Arc::new(ComputeClientPool::new(
350            config.batch_exchange_connection_pool_size(),
351            config.batch.developer.compute_client_config.clone(),
352        ));
353        let frontend_client_pool = Arc::new(FrontendClientPool::new(
354            1,
355            config.batch.developer.frontend_client_config.clone(),
356        ));
357        let query_manager = QueryManager::new(
358            worker_node_manager.clone(),
359            compute_client_pool.clone(),
360            catalog_reader.clone(),
361            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
362            config.batch.distributed_query_limit,
363            config.batch.max_batch_queries_per_frontend_node,
364        );
365
366        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
367        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
368        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
369            meta_client.clone(),
370            catalog_updated_rx,
371        ));
372
373        let system_params_manager =
374            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
375
376        LocalSecretManager::init(
377            opts.temp_secret_file_dir,
378            meta_client.cluster_id().to_owned(),
379            worker_id,
380        );
381
382        // This `session_params` should be initialized during the initial notification in `observer_manager`
383        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
384        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
385        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
386
387        let frontend_observer_node = FrontendObserverNode::new(
388            worker_node_manager.clone(),
389            catalog,
390            catalog_updated_tx,
391            user_info_manager,
392            hummock_snapshot_manager.clone(),
393            system_params_manager.clone(),
394            session_params.clone(),
395            compute_client_pool.clone(),
396        );
397        let observer_manager =
398            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
399                .await;
400        let observer_join_handle = observer_manager.start().await;
401        join_handles.push(observer_join_handle);
402
403        meta_client.activate(&frontend_address).await?;
404
405        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
406        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
407        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
408
409        if config.server.metrics_level > MetricLevel::Disabled {
410            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
411        }
412
413        let health_srv = HealthServiceImpl::new();
414        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
415        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
416
417        let telemetry_manager = TelemetryManager::new(
418            Arc::new(meta_client.clone()),
419            Arc::new(FrontendTelemetryCreator::new()),
420        );
421
422        // if the toml config file or env variable disables telemetry, do not watch system params
423        // change because if any of configs disable telemetry, we should never start it
424        if config.server.telemetry_enabled && telemetry_env_enabled() {
425            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
426            join_handles.push(join_handle);
427            shutdown_senders.push(shutdown_sender);
428        } else {
429            tracing::info!("Telemetry didn't start due to config");
430        }
431
432        tokio::spawn(async move {
433            tonic::transport::Server::builder()
434                .add_service(HealthServer::new(health_srv))
435                .add_service(FrontendServiceServer::new(frontend_srv))
436                .serve(frontend_rpc_addr)
437                .await
438                .unwrap();
439        });
440        info!(
441            "Health Check RPC Listener is set up on {}",
442            opts.frontend_rpc_listener_addr.clone()
443        );
444
445        let creating_streaming_job_tracker =
446            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
447
448        let runtime = {
449            let mut builder = Builder::new_multi_thread();
450            if let Some(frontend_compute_runtime_worker_threads) =
451                config.batch.frontend_compute_runtime_worker_threads
452            {
453                builder.worker_threads(frontend_compute_runtime_worker_threads);
454            }
455            builder
456                .thread_name("rw-batch-local")
457                .enable_all()
458                .build()
459                .unwrap()
460        };
461        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
462
463        let sessions = sessions_map.clone();
464        // Idle transaction background monitor
465        let join_handle = tokio::spawn(async move {
466            let mut check_idle_txn_interval =
467                tokio::time::interval(core::time::Duration::from_secs(10));
468            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469            check_idle_txn_interval.reset();
470            loop {
471                check_idle_txn_interval.tick().await;
472                sessions.read().values().for_each(|session| {
473                    let _ = session.check_idle_in_transaction_timeout();
474                })
475            }
476        });
477        join_handles.push(join_handle);
478
479        // Clean up the spill directory.
480        #[cfg(not(madsim))]
481        if config.batch.enable_spill {
482            SpillOp::clean_spill_directory()
483                .await
484                .map_err(|err| anyhow!(err))?;
485        }
486
487        let total_memory_bytes = opts.frontend_total_memory_bytes;
488        let heap_profiler =
489            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
490        // Run a background heap profiler
491        heap_profiler.start();
492
493        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
494        let mem_context = MemoryContext::root(
495            frontend_metrics.batch_total_mem.clone(),
496            batch_memory_limit as u64,
497        );
498
499        // Initialize Prometheus client if endpoint is provided
500        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
501            match PrometheusClient::try_from(endpoint.as_str()) {
502                Ok(client) => {
503                    info!("Prometheus client initialized with endpoint: {}", endpoint);
504                    Some(client)
505                }
506                Err(e) => {
507                    error!(
508                        error = %e.as_report(),
509                        "failed to initialize Prometheus client",
510                    );
511                    None
512                }
513            }
514        } else {
515            None
516        };
517
518        // Initialize Prometheus selector
519        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
520
521        info!(
522            "Frontend  total_memory: {} batch_memory: {}",
523            convert(total_memory_bytes as _),
524            convert(batch_memory_limit as _),
525        );
526
527        Ok((
528            Self {
529                catalog_reader,
530                catalog_writer,
531                user_info_reader,
532                user_info_writer,
533                worker_node_manager,
534                meta_client: frontend_meta_client,
535                query_manager,
536                hummock_snapshot_manager,
537                system_params_manager,
538                session_params,
539                server_addr: frontend_address,
540                client_pool: compute_client_pool,
541                frontend_client_pool,
542                frontend_metrics,
543                cursor_metrics,
544                spill_metrics,
545                sessions_map,
546                batch_config: config.batch,
547                frontend_config: config.frontend,
548                meta_config: config.meta,
549                streaming_config: config.streaming,
550                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
551                udf_config: config.udf,
552                source_metrics,
553                creating_streaming_job_tracker,
554                compute_runtime,
555                mem_context,
556                prometheus_client,
557                prometheus_selector,
558            },
559            join_handles,
560            shutdown_senders,
561        ))
562    }
563
564    /// Get a reference to the frontend env's catalog writer.
565    ///
566    /// This method is intentionally private, and a write guard is required for the caller to
567    /// prove that the write operations are permitted in the current transaction.
568    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
569        &*self.catalog_writer
570    }
571
572    /// Get a reference to the frontend env's catalog reader.
573    pub fn catalog_reader(&self) -> &CatalogReader {
574        &self.catalog_reader
575    }
576
577    /// Get a reference to the frontend env's user info writer.
578    ///
579    /// This method is intentionally private, and a write guard is required for the caller to
580    /// prove that the write operations are permitted in the current transaction.
581    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
582        &*self.user_info_writer
583    }
584
585    /// Get a reference to the frontend env's user info reader.
586    pub fn user_info_reader(&self) -> &UserInfoReader {
587        &self.user_info_reader
588    }
589
590    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
591        self.worker_node_manager.clone()
592    }
593
594    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
595        &*self.meta_client
596    }
597
598    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
599        self.meta_client.clone()
600    }
601
602    pub fn query_manager(&self) -> &QueryManager {
603        &self.query_manager
604    }
605
606    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
607        &self.hummock_snapshot_manager
608    }
609
610    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
611        &self.system_params_manager
612    }
613
614    pub fn session_params_snapshot(&self) -> SessionConfig {
615        self.session_params.read_recursive().clone()
616    }
617
618    pub fn sbc_address(&self) -> &String {
619        &self.serverless_backfill_controller_addr
620    }
621
622    /// Get a reference to the Prometheus client if available.
623    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
624        self.prometheus_client.as_ref()
625    }
626
627    /// Get the Prometheus selector string.
628    pub fn prometheus_selector(&self) -> &str {
629        &self.prometheus_selector
630    }
631
632    pub fn server_address(&self) -> &HostAddr {
633        &self.server_addr
634    }
635
636    pub fn client_pool(&self) -> ComputeClientPoolRef {
637        self.client_pool.clone()
638    }
639
640    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
641        self.frontend_client_pool.clone()
642    }
643
644    pub fn batch_config(&self) -> &BatchConfig {
645        &self.batch_config
646    }
647
648    pub fn frontend_config(&self) -> &FrontendConfig {
649        &self.frontend_config
650    }
651
652    pub fn streaming_config(&self) -> &StreamingConfig {
653        &self.streaming_config
654    }
655
656    pub fn udf_config(&self) -> &UdfConfig {
657        &self.udf_config
658    }
659
660    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
661        self.source_metrics.clone()
662    }
663
664    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
665        self.spill_metrics.clone()
666    }
667
668    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
669        &self.creating_streaming_job_tracker
670    }
671
672    pub fn sessions_map(&self) -> &SessionMapRef {
673        &self.sessions_map
674    }
675
676    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
677        self.compute_runtime.clone()
678    }
679
680    /// Cancel queries (i.e. batch queries) in session.
681    /// If the session exists return true, otherwise, return false.
682    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
683        cancel_queries_in_session(session_id, self.sessions_map.clone())
684    }
685
686    /// Cancel creating jobs (i.e. streaming queries) in session.
687    /// If the session exists return true, otherwise, return false.
688    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
689        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
690    }
691
692    pub fn mem_context(&self) -> MemoryContext {
693        self.mem_context.clone()
694    }
695}
696
697#[derive(Clone)]
698pub struct AuthContext {
699    pub database: String,
700    pub user_name: String,
701    pub user_id: UserId,
702}
703
704impl AuthContext {
705    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
706        Self {
707            database,
708            user_name,
709            user_id,
710        }
711    }
712}
713pub struct SessionImpl {
714    env: FrontendEnv,
715    auth_context: Arc<RwLock<AuthContext>>,
716    /// Used for user authentication.
717    user_authenticator: UserAuthenticator,
718    /// Stores the value of configurations.
719    config_map: Arc<RwLock<SessionConfig>>,
720
721    /// Channel sender for frontend handler to send notices.
722    notice_tx: UnboundedSender<String>,
723    /// Channel receiver for pgwire to take notices and send to clients.
724    notice_rx: Mutex<UnboundedReceiver<String>>,
725
726    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
727    id: (i32, i32),
728
729    /// Client address
730    peer_addr: AddressRef,
731
732    /// Transaction state.
733    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
734    /// async functions, there should actually be no contention.
735    txn: Arc<Mutex<transaction::State>>,
736
737    /// Query cancel flag.
738    /// This flag is set only when current query is executed in local mode, and used to cancel
739    /// local query.
740    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
741
742    /// execution context represents the lifetime of a running SQL in the current session
743    exec_context: Mutex<Option<Weak<ExecContext>>>,
744
745    /// Last idle instant
746    last_idle_instant: Arc<Mutex<Option<Instant>>>,
747
748    cursor_manager: Arc<CursorManager>,
749
750    /// temporary sources for the current session
751    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
752
753    /// staging catalogs for the current session
754    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
755}
756
757/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
758/// Temporary sources are automatically dropped at the end of a session
759/// Temporary sources are expected to be selected by batch queries, not streaming queries.
760/// Temporary sources currently are only used by cloud portal to preview the data during table and
761/// source creation, so it is a internal feature and not exposed to users.
762/// The current PR supports temporary source with minimum effort,
763/// so we don't care about the database name and schema name, but only care about the source name.
764/// Temporary sources can only be shown via `show sources` command but not other system tables.
765#[derive(Default, Clone)]
766pub struct TemporarySourceManager {
767    sources: HashMap<String, SourceCatalog>,
768}
769
770impl TemporarySourceManager {
771    pub fn new() -> Self {
772        Self {
773            sources: HashMap::new(),
774        }
775    }
776
777    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
778        self.sources.insert(name, source);
779    }
780
781    pub fn drop_source(&mut self, name: &str) {
782        self.sources.remove(name);
783    }
784
785    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
786        self.sources.get(name)
787    }
788
789    pub fn keys(&self) -> Vec<String> {
790        self.sources.keys().cloned().collect()
791    }
792}
793
794/// Staging catalog manager is used to manage the tables creating in the current session.
795#[derive(Default, Clone)]
796pub struct StagingCatalogManager {
797    // staging tables creating in the current session.
798    tables: HashMap<String, TableCatalog>,
799}
800
801impl StagingCatalogManager {
802    pub fn new() -> Self {
803        Self {
804            tables: HashMap::new(),
805        }
806    }
807
808    pub fn create_table(&mut self, name: String, table: TableCatalog) {
809        self.tables.insert(name, table);
810    }
811
812    pub fn drop_table(&mut self, name: &str) {
813        self.tables.remove(name);
814    }
815
816    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
817        self.tables.get(name)
818    }
819}
820
821#[derive(Error, Debug)]
822pub enum CheckRelationError {
823    #[error("{0}")]
824    Resolve(#[from] ResolveQualifiedNameError),
825    #[error("{0}")]
826    Catalog(#[from] CatalogError),
827}
828
829impl From<CheckRelationError> for RwError {
830    fn from(e: CheckRelationError) -> Self {
831        match e {
832            CheckRelationError::Resolve(e) => e.into(),
833            CheckRelationError::Catalog(e) => e.into(),
834        }
835    }
836}
837
838impl SessionImpl {
839    pub(crate) fn new(
840        env: FrontendEnv,
841        auth_context: AuthContext,
842        user_authenticator: UserAuthenticator,
843        id: SessionId,
844        peer_addr: AddressRef,
845        session_config: SessionConfig,
846    ) -> Self {
847        let cursor_metrics = env.cursor_metrics.clone();
848        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
849
850        Self {
851            env,
852            auth_context: Arc::new(RwLock::new(auth_context)),
853            user_authenticator,
854            config_map: Arc::new(RwLock::new(session_config)),
855            id,
856            peer_addr,
857            txn: Default::default(),
858            current_query_cancel_flag: Mutex::new(None),
859            notice_tx,
860            notice_rx: Mutex::new(notice_rx),
861            exec_context: Mutex::new(None),
862            last_idle_instant: Default::default(),
863            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
864            temporary_source_manager: Default::default(),
865            staging_catalog_manager: Default::default(),
866        }
867    }
868
869    #[cfg(test)]
870    pub fn mock() -> Self {
871        let env = FrontendEnv::mock();
872        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
873
874        Self {
875            env: FrontendEnv::mock(),
876            auth_context: Arc::new(RwLock::new(AuthContext::new(
877                DEFAULT_DATABASE_NAME.to_owned(),
878                DEFAULT_SUPER_USER.to_owned(),
879                DEFAULT_SUPER_USER_ID,
880            ))),
881            user_authenticator: UserAuthenticator::None,
882            config_map: Default::default(),
883            // Mock session use non-sense id.
884            id: (0, 0),
885            txn: Default::default(),
886            current_query_cancel_flag: Mutex::new(None),
887            notice_tx,
888            notice_rx: Mutex::new(notice_rx),
889            exec_context: Mutex::new(None),
890            peer_addr: Address::Tcp(SocketAddr::new(
891                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
892                8080,
893            ))
894            .into(),
895            last_idle_instant: Default::default(),
896            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
897            temporary_source_manager: Default::default(),
898            staging_catalog_manager: Default::default(),
899        }
900    }
901
902    pub(crate) fn env(&self) -> &FrontendEnv {
903        &self.env
904    }
905
906    pub fn auth_context(&self) -> Arc<AuthContext> {
907        let ctx = self.auth_context.read();
908        Arc::new(ctx.clone())
909    }
910
911    pub fn database(&self) -> String {
912        self.auth_context.read().database.clone()
913    }
914
915    pub fn database_id(&self) -> DatabaseId {
916        let db_name = self.database();
917        self.env
918            .catalog_reader()
919            .read_guard()
920            .get_database_by_name(&db_name)
921            .map(|db| db.id())
922            .expect("session database not found")
923    }
924
925    pub fn user_name(&self) -> String {
926        self.auth_context.read().user_name.clone()
927    }
928
929    pub fn user_id(&self) -> UserId {
930        self.auth_context.read().user_id
931    }
932
933    pub fn update_database(&self, database: String) {
934        self.auth_context.write().database = database;
935    }
936
937    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
938        Arc::clone(&self.config_map)
939    }
940
941    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
942        self.config_map.read()
943    }
944
945    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
946        self.config_map
947            .write()
948            .set(key, value, &mut ())
949            .map_err(Into::into)
950    }
951
952    pub fn reset_config(&self, key: &str) -> Result<String> {
953        self.config_map
954            .write()
955            .reset(key, &mut ())
956            .map_err(Into::into)
957    }
958
959    pub fn set_config_report(
960        &self,
961        key: &str,
962        value: Option<String>,
963        mut reporter: impl ConfigReporter,
964    ) -> Result<String> {
965        if let Some(value) = value {
966            self.config_map
967                .write()
968                .set(key, value, &mut reporter)
969                .map_err(Into::into)
970        } else {
971            self.config_map
972                .write()
973                .reset(key, &mut reporter)
974                .map_err(Into::into)
975        }
976    }
977
978    pub fn session_id(&self) -> SessionId {
979        self.id
980    }
981
982    pub fn running_sql(&self) -> Option<Arc<str>> {
983        self.exec_context
984            .lock()
985            .as_ref()
986            .and_then(|weak| weak.upgrade())
987            .map(|context| context.running_sql.clone())
988    }
989
990    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
991        self.cursor_manager.clone()
992    }
993
994    pub fn peer_addr(&self) -> &Address {
995        &self.peer_addr
996    }
997
998    pub fn elapse_since_running_sql(&self) -> Option<u128> {
999        self.exec_context
1000            .lock()
1001            .as_ref()
1002            .and_then(|weak| weak.upgrade())
1003            .map(|context| context.last_instant.elapsed().as_millis())
1004    }
1005
1006    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1007        self.last_idle_instant
1008            .lock()
1009            .as_ref()
1010            .map(|x| x.elapsed().as_millis())
1011    }
1012
1013    pub fn check_relation_name_duplicated(
1014        &self,
1015        name: ObjectName,
1016        stmt_type: StatementType,
1017        if_not_exists: bool,
1018    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1019        let db_name = &self.database();
1020        let catalog_reader = self.env().catalog_reader().read_guard();
1021        let (schema_name, relation_name) = {
1022            let (schema_name, relation_name) =
1023                Binder::resolve_schema_qualified_name(db_name, &name)?;
1024            let search_path = self.config().search_path();
1025            let user_name = &self.user_name();
1026            let schema_name = match schema_name {
1027                Some(schema_name) => schema_name,
1028                None => catalog_reader
1029                    .first_valid_schema(db_name, &search_path, user_name)?
1030                    .name(),
1031            };
1032            (schema_name, relation_name)
1033        };
1034        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1035            Err(CatalogError::Duplicated(_, name, is_creating)) if if_not_exists => {
1036                // If relation is created, return directly.
1037                // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, We can't
1038                // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1039                // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1040                if !is_creating {
1041                    Ok(Either::Right(
1042                        PgResponse::builder(stmt_type)
1043                            .notice(format!("relation \"{}\" already exists, skipping", name))
1044                            .into(),
1045                    ))
1046                } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1047                    // For now, when a Subscription is creating, we return directly with an additional message.
1048                    // TODO: Subscription should also be processed in the same way as StreamingJob.
1049                    Ok(Either::Right(
1050                        PgResponse::builder(stmt_type)
1051                            .notice(format!(
1052                                "relation \"{}\" already exists but still creating, skipping",
1053                                name
1054                            ))
1055                            .into(),
1056                    ))
1057                } else {
1058                    Ok(Either::Left(()))
1059                }
1060            }
1061            Err(e) => Err(e.into()),
1062            Ok(_) => Ok(Either::Left(())),
1063        }
1064    }
1065
1066    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1067        let db_name = &self.database();
1068        let catalog_reader = self.env().catalog_reader().read_guard();
1069        let (schema_name, secret_name) = {
1070            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1071            let search_path = self.config().search_path();
1072            let user_name = &self.user_name();
1073            let schema_name = match schema_name {
1074                Some(schema_name) => schema_name,
1075                None => catalog_reader
1076                    .first_valid_schema(db_name, &search_path, user_name)?
1077                    .name(),
1078            };
1079            (schema_name, secret_name)
1080        };
1081        catalog_reader
1082            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1083            .map_err(RwError::from)
1084    }
1085
1086    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1087        let db_name = &self.database();
1088        let catalog_reader = self.env().catalog_reader().read_guard();
1089        let (schema_name, connection_name) = {
1090            let (schema_name, connection_name) =
1091                Binder::resolve_schema_qualified_name(db_name, &name)?;
1092            let search_path = self.config().search_path();
1093            let user_name = &self.user_name();
1094            let schema_name = match schema_name {
1095                Some(schema_name) => schema_name,
1096                None => catalog_reader
1097                    .first_valid_schema(db_name, &search_path, user_name)?
1098                    .name(),
1099            };
1100            (schema_name, connection_name)
1101        };
1102        catalog_reader
1103            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1104            .map_err(RwError::from)
1105    }
1106
1107    pub fn check_function_name_duplicated(
1108        &self,
1109        stmt_type: StatementType,
1110        name: ObjectName,
1111        arg_types: &[DataType],
1112        if_not_exists: bool,
1113    ) -> Result<Either<(), RwPgResponse>> {
1114        let db_name = &self.database();
1115        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1116        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1117
1118        let catalog_reader = self.env().catalog_reader().read_guard();
1119        if catalog_reader
1120            .get_schema_by_id(database_id, schema_id)?
1121            .get_function_by_name_args(&function_name, arg_types)
1122            .is_some()
1123        {
1124            let full_name = format!(
1125                "{function_name}({})",
1126                arg_types.iter().map(|t| t.to_string()).join(",")
1127            );
1128            if if_not_exists {
1129                Ok(Either::Right(
1130                    PgResponse::builder(stmt_type)
1131                        .notice(format!(
1132                            "function \"{}\" already exists, skipping",
1133                            full_name
1134                        ))
1135                        .into(),
1136                ))
1137            } else {
1138                Err(CatalogError::duplicated("function", full_name).into())
1139            }
1140        } else {
1141            Ok(Either::Left(()))
1142        }
1143    }
1144
1145    /// Also check if the user has the privilege to create in the schema.
1146    pub fn get_database_and_schema_id_for_create(
1147        &self,
1148        schema_name: Option<String>,
1149    ) -> Result<(DatabaseId, SchemaId)> {
1150        let db_name = &self.database();
1151
1152        let search_path = self.config().search_path();
1153        let user_name = &self.user_name();
1154
1155        let catalog_reader = self.env().catalog_reader().read_guard();
1156        let schema = match schema_name {
1157            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1158            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1159        };
1160        let schema_name = schema.name();
1161
1162        check_schema_writable(&schema_name)?;
1163        self.check_privileges(&[ObjectCheckItem::new(
1164            schema.owner(),
1165            AclMode::Create,
1166            schema_name,
1167            schema.id(),
1168        )])?;
1169
1170        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1171        Ok((db_id, schema.id()))
1172    }
1173
1174    pub fn get_connection_by_name(
1175        &self,
1176        schema_name: Option<String>,
1177        connection_name: &str,
1178    ) -> Result<Arc<ConnectionCatalog>> {
1179        let db_name = &self.database();
1180        let search_path = self.config().search_path();
1181        let user_name = &self.user_name();
1182
1183        let catalog_reader = self.env().catalog_reader().read_guard();
1184        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1185        let (connection, _) =
1186            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1187
1188        self.check_privileges(&[ObjectCheckItem::new(
1189            connection.owner(),
1190            AclMode::Usage,
1191            connection.name.clone(),
1192            connection.id,
1193        )])?;
1194
1195        Ok(connection.clone())
1196    }
1197
1198    pub fn get_subscription_by_schema_id_name(
1199        &self,
1200        schema_id: SchemaId,
1201        subscription_name: &str,
1202    ) -> Result<Arc<SubscriptionCatalog>> {
1203        let db_name = &self.database();
1204
1205        let catalog_reader = self.env().catalog_reader().read_guard();
1206        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1207        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1208        let subscription = schema
1209            .get_subscription_by_name(subscription_name)
1210            .ok_or_else(|| {
1211                RwError::from(ErrorCode::ItemNotFound(format!(
1212                    "subscription {} not found",
1213                    subscription_name
1214                )))
1215            })?;
1216        Ok(subscription.clone())
1217    }
1218
1219    pub fn get_subscription_by_name(
1220        &self,
1221        schema_name: Option<String>,
1222        subscription_name: &str,
1223    ) -> Result<Arc<SubscriptionCatalog>> {
1224        let db_name = &self.database();
1225        let search_path = self.config().search_path();
1226        let user_name = &self.user_name();
1227
1228        let catalog_reader = self.env().catalog_reader().read_guard();
1229        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1230        let (subscription, _) =
1231            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1232        Ok(subscription.clone())
1233    }
1234
1235    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1236        let catalog_reader = self.env().catalog_reader().read_guard();
1237        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1238    }
1239
1240    pub fn get_table_by_name(
1241        &self,
1242        table_name: &str,
1243        db_id: DatabaseId,
1244        schema_id: SchemaId,
1245    ) -> Result<Arc<TableCatalog>> {
1246        let catalog_reader = self.env().catalog_reader().read_guard();
1247        let table = catalog_reader
1248            .get_schema_by_id(db_id, schema_id)?
1249            .get_created_table_by_name(table_name)
1250            .ok_or_else(|| {
1251                Error::new(
1252                    ErrorKind::InvalidInput,
1253                    format!("table \"{}\" does not exist", table_name),
1254                )
1255            })?;
1256
1257        self.check_privileges(&[ObjectCheckItem::new(
1258            table.owner(),
1259            AclMode::Select,
1260            table_name.to_owned(),
1261            table.id,
1262        )])?;
1263
1264        Ok(table.clone())
1265    }
1266
1267    pub fn get_secret_by_name(
1268        &self,
1269        schema_name: Option<String>,
1270        secret_name: &str,
1271    ) -> Result<Arc<SecretCatalog>> {
1272        let db_name = &self.database();
1273        let search_path = self.config().search_path();
1274        let user_name = &self.user_name();
1275
1276        let catalog_reader = self.env().catalog_reader().read_guard();
1277        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1278        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1279
1280        self.check_privileges(&[ObjectCheckItem::new(
1281            secret.owner(),
1282            AclMode::Usage,
1283            secret.name.clone(),
1284            secret.id,
1285        )])?;
1286
1287        Ok(secret.clone())
1288    }
1289
1290    pub fn list_change_log_epochs(
1291        &self,
1292        table_id: TableId,
1293        min_epoch: u64,
1294        max_count: u32,
1295    ) -> Result<Vec<u64>> {
1296        Ok(self
1297            .env
1298            .hummock_snapshot_manager()
1299            .acquire()
1300            .list_change_log_epochs(table_id, min_epoch, max_count))
1301    }
1302
1303    pub fn clear_cancel_query_flag(&self) {
1304        let mut flag = self.current_query_cancel_flag.lock();
1305        *flag = None;
1306    }
1307
1308    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1309        let mut flag = self.current_query_cancel_flag.lock();
1310        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1311        *flag = Some(shutdown_tx);
1312        shutdown_rx
1313    }
1314
1315    pub fn cancel_current_query(&self) {
1316        let mut flag_guard = self.current_query_cancel_flag.lock();
1317        if let Some(sender) = flag_guard.take() {
1318            info!("Trying to cancel query in local mode.");
1319            // Current running query is in local mode
1320            sender.cancel();
1321            info!("Cancel query request sent.");
1322        } else {
1323            info!("Trying to cancel query in distributed mode.");
1324            self.env.query_manager().cancel_queries_in_session(self.id)
1325        }
1326    }
1327
1328    pub fn cancel_current_creating_job(&self) {
1329        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1330    }
1331
1332    /// This function only used for test now.
1333    /// Maybe we can remove it in the future.
1334    pub async fn run_statement(
1335        self: Arc<Self>,
1336        sql: Arc<str>,
1337        formats: Vec<Format>,
1338    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1339        // Parse sql.
1340        let mut stmts = Parser::parse_sql(&sql)?;
1341        if stmts.is_empty() {
1342            return Ok(PgResponse::empty_result(
1343                pgwire::pg_response::StatementType::EMPTY,
1344            ));
1345        }
1346        if stmts.len() > 1 {
1347            return Ok(
1348                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1349                    .notice("cannot insert multiple commands into statement")
1350                    .into(),
1351            );
1352        }
1353        let stmt = stmts.swap_remove(0);
1354        let rsp = handle(self, stmt, sql.clone(), formats).await?;
1355        Ok(rsp)
1356    }
1357
1358    pub fn notice_to_user(&self, str: impl Into<String>) {
1359        let notice = str.into();
1360        tracing::trace!(notice, "notice to user");
1361        self.notice_tx
1362            .send(notice)
1363            .expect("notice channel should not be closed");
1364    }
1365
1366    pub fn is_barrier_read(&self) -> bool {
1367        match self.config().visibility_mode() {
1368            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1369            VisibilityMode::All => true,
1370            VisibilityMode::Checkpoint => false,
1371        }
1372    }
1373
1374    pub fn statement_timeout(&self) -> Duration {
1375        if self.config().statement_timeout() == 0 {
1376            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1377        } else {
1378            Duration::from_secs(self.config().statement_timeout() as u64)
1379        }
1380    }
1381
1382    pub fn create_temporary_source(&self, source: SourceCatalog) {
1383        self.temporary_source_manager
1384            .lock()
1385            .create_source(source.name.clone(), source);
1386    }
1387
1388    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1389        self.temporary_source_manager
1390            .lock()
1391            .get_source(name)
1392            .cloned()
1393    }
1394
1395    pub fn drop_temporary_source(&self, name: &str) {
1396        self.temporary_source_manager.lock().drop_source(name);
1397    }
1398
1399    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1400        self.temporary_source_manager.lock().clone()
1401    }
1402
1403    pub fn create_staging_table(&self, table: TableCatalog) {
1404        self.staging_catalog_manager
1405            .lock()
1406            .create_table(table.name.clone(), table);
1407    }
1408
1409    pub fn drop_staging_table(&self, name: &str) {
1410        self.staging_catalog_manager.lock().drop_table(name);
1411    }
1412
1413    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1414        self.staging_catalog_manager.lock().clone()
1415    }
1416
1417    pub async fn check_cluster_limits(&self) -> Result<()> {
1418        if self.config().bypass_cluster_limits() {
1419            return Ok(());
1420        }
1421
1422        let gen_message = |ActorCountPerParallelism {
1423                               worker_id_to_actor_count,
1424                               hard_limit,
1425                               soft_limit,
1426                           }: ActorCountPerParallelism,
1427                           exceed_hard_limit: bool|
1428         -> String {
1429            let (limit_type, action) = if exceed_hard_limit {
1430                ("critical", "Scale the cluster immediately to proceed.")
1431            } else {
1432                (
1433                    "recommended",
1434                    "Consider scaling the cluster for optimal performance.",
1435                )
1436            };
1437            format!(
1438                r#"Actor count per parallelism exceeds the {limit_type} limit.
1439
1440Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1441
1442HINT:
1443- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1444- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1445  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1446- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1447
1448DETAILS:
1449- hard limit: {hard_limit}
1450- soft limit: {soft_limit}
1451- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1452            )
1453        };
1454
1455        let limits = self.env().meta_client().get_cluster_limits().await?;
1456        for limit in limits {
1457            match limit {
1458                cluster_limit::ClusterLimit::ActorCount(l) => {
1459                    if l.exceed_hard_limit() {
1460                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1461                            l, true,
1462                        ))));
1463                    } else if l.exceed_soft_limit() {
1464                        self.notice_to_user(gen_message(l, false));
1465                    }
1466                }
1467            }
1468        }
1469        Ok(())
1470    }
1471}
1472
1473pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1474    std::sync::OnceLock::new();
1475
1476pub struct SessionManagerImpl {
1477    env: FrontendEnv,
1478    _join_handles: Vec<JoinHandle<()>>,
1479    _shutdown_senders: Vec<Sender<()>>,
1480    number: AtomicI32,
1481}
1482
1483impl SessionManager for SessionManagerImpl {
1484    type Session = SessionImpl;
1485
1486    fn create_dummy_session(
1487        &self,
1488        database_id: DatabaseId,
1489        user_id: u32,
1490    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
1491        let dummy_addr = Address::Tcp(SocketAddr::new(
1492            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1493            5691, // port of meta
1494        ));
1495        let user_reader = self.env.user_info_reader();
1496        let reader = user_reader.read_guard();
1497        if let Some(user_name) = reader.get_user_name_by_id(user_id) {
1498            self.connect_inner(database_id, user_name.as_str(), Arc::new(dummy_addr))
1499        } else {
1500            Err(Box::new(Error::new(
1501                ErrorKind::InvalidInput,
1502                format!("Role id {} does not exist", user_id),
1503            )))
1504        }
1505    }
1506
1507    fn connect(
1508        &self,
1509        database: &str,
1510        user_name: &str,
1511        peer_addr: AddressRef,
1512    ) -> std::result::Result<Arc<Self::Session>, BoxedError> {
1513        let catalog_reader = self.env.catalog_reader();
1514        let reader = catalog_reader.read_guard();
1515        let database_id = reader
1516            .get_database_by_name(database)
1517            .map_err(|_| {
1518                Box::new(Error::new(
1519                    ErrorKind::InvalidInput,
1520                    format!("database \"{}\" does not exist", database),
1521                ))
1522            })?
1523            .id();
1524
1525        self.connect_inner(database_id, user_name, peer_addr)
1526    }
1527
1528    /// Used when cancel request happened.
1529    fn cancel_queries_in_session(&self, session_id: SessionId) {
1530        self.env.cancel_queries_in_session(session_id);
1531    }
1532
1533    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1534        self.env.cancel_creating_jobs_in_session(session_id);
1535    }
1536
1537    fn end_session(&self, session: &Self::Session) {
1538        self.delete_session(&session.session_id());
1539    }
1540
1541    async fn shutdown(&self) {
1542        // Clean up the session map.
1543        self.env.sessions_map().write().clear();
1544        // Unregister from the meta service.
1545        self.env.meta_client().try_unregister().await;
1546    }
1547}
1548
1549impl SessionManagerImpl {
1550    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1551        // TODO(shutdown): only save join handles that **need** to be shutdown
1552        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1553        Ok(Self {
1554            env,
1555            _join_handles: join_handles,
1556            _shutdown_senders: shutdown_senders,
1557            number: AtomicI32::new(0),
1558        })
1559    }
1560
1561    pub(crate) fn env(&self) -> &FrontendEnv {
1562        &self.env
1563    }
1564
1565    fn insert_session(&self, session: Arc<SessionImpl>) {
1566        let active_sessions = {
1567            let mut write_guard = self.env.sessions_map.write();
1568            write_guard.insert(session.id(), session);
1569            write_guard.len()
1570        };
1571        self.env
1572            .frontend_metrics
1573            .active_sessions
1574            .set(active_sessions as i64);
1575    }
1576
1577    fn delete_session(&self, session_id: &SessionId) {
1578        let active_sessions = {
1579            let mut write_guard = self.env.sessions_map.write();
1580            write_guard.remove(session_id);
1581            write_guard.len()
1582        };
1583        self.env
1584            .frontend_metrics
1585            .active_sessions
1586            .set(active_sessions as i64);
1587    }
1588
1589    fn connect_inner(
1590        &self,
1591        database_id: DatabaseId,
1592        user_name: &str,
1593        peer_addr: AddressRef,
1594    ) -> std::result::Result<Arc<SessionImpl>, BoxedError> {
1595        let catalog_reader = self.env.catalog_reader();
1596        let reader = catalog_reader.read_guard();
1597        let (database_name, database_owner) = {
1598            let db = reader.get_database_by_id(database_id).map_err(|_| {
1599                Box::new(Error::new(
1600                    ErrorKind::InvalidInput,
1601                    format!("database \"{}\" does not exist", database_id),
1602                ))
1603            })?;
1604            (db.name(), db.owner())
1605        };
1606
1607        let user_reader = self.env.user_info_reader();
1608        let reader = user_reader.read_guard();
1609        if let Some(user) = reader.get_user_by_name(user_name) {
1610            if !user.can_login {
1611                return Err(Box::new(Error::new(
1612                    ErrorKind::InvalidInput,
1613                    format!("User {} is not allowed to login", user_name),
1614                )));
1615            }
1616            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1617            if !user.is_super && database_owner != user.id && !has_privilege {
1618                return Err(Box::new(Error::new(
1619                    ErrorKind::PermissionDenied,
1620                    "User does not have CONNECT privilege.",
1621                )));
1622            }
1623
1624            // Check HBA configuration for LDAP authentication
1625            let (connection_type, client_addr) = match peer_addr.as_ref() {
1626                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1627                Address::Unix(_) => (ConnectionType::Local, None),
1628            };
1629            tracing::debug!(
1630                "receive connection: type={:?}, client_addr={:?}",
1631                connection_type,
1632                client_addr
1633            );
1634
1635            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1636                &connection_type,
1637                database_name,
1638                user_name,
1639                client_addr,
1640            );
1641
1642            // TODO: adding `FATAL` message support for no matching HBA entry.
1643            let Some(hba_entry_opt) = hba_entry_opt else {
1644                return Err(Box::new(Error::new(
1645                    ErrorKind::PermissionDenied,
1646                    format!(
1647                        "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1648                    ),
1649                )));
1650            };
1651
1652            // Determine the user authenticator based on the user's auth info.
1653            let authenticator_by_info = || -> std::result::Result<UserAuthenticator, BoxedError> {
1654                let authenticator = match &user.auth_info {
1655                    None => UserAuthenticator::None,
1656                    Some(auth_info) => match auth_info.encryption_type() {
1657                        EncryptionType::Plaintext => {
1658                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1659                        }
1660                        EncryptionType::Md5 => {
1661                            let mut salt = [0; 4];
1662                            let mut rng = rand::rng();
1663                            rng.fill_bytes(&mut salt);
1664                            UserAuthenticator::Md5WithSalt {
1665                                encrypted_password: md5_hash_with_salt(
1666                                    &auth_info.encrypted_value,
1667                                    &salt,
1668                                ),
1669                                salt,
1670                            }
1671                        }
1672                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1673                            metadata: auth_info.metadata.clone(),
1674                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1675                        },
1676                        _ => {
1677                            return Err(Box::new(Error::new(
1678                                ErrorKind::Unsupported,
1679                                format!(
1680                                    "Unsupported auth type: {}",
1681                                    auth_info.encryption_type().as_str_name()
1682                                ),
1683                            )));
1684                        }
1685                    },
1686                };
1687                Ok(authenticator)
1688            };
1689
1690            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1691                (AuthMethod::Trust, _) => UserAuthenticator::None,
1692                // For backward compatibility, we allow password auth method to work with any auth info.
1693                (AuthMethod::Password, _) => authenticator_by_info()?,
1694                (AuthMethod::Md5, Some(auth_info))
1695                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1696                {
1697                    authenticator_by_info()?
1698                }
1699                (AuthMethod::OAuth, Some(auth_info))
1700                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1701                {
1702                    authenticator_by_info()?
1703                }
1704                (AuthMethod::Ldap, _) => {
1705                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1706                }
1707                _ => {
1708                    return Err(Box::new(Error::new(
1709                        ErrorKind::PermissionDenied,
1710                        format!("password authentication failed for user \"{user_name}\""),
1711                    )));
1712                }
1713            };
1714
1715            // Assign a session id and insert into sessions map (for cancel request).
1716            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1717            // Use a trivial strategy: process_id and secret_key are equal.
1718            let id = (secret_key, secret_key);
1719            // Read session params snapshot from frontend env.
1720            let session_config = self.env.session_params_snapshot();
1721
1722            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1723                self.env.clone(),
1724                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1725                user_authenticator,
1726                id,
1727                peer_addr,
1728                session_config,
1729            )
1730            .into();
1731            self.insert_session(session_impl.clone());
1732
1733            Ok(session_impl)
1734        } else {
1735            Err(Box::new(Error::new(
1736                ErrorKind::InvalidInput,
1737                format!("Role {} does not exist", user_name),
1738            )))
1739        }
1740    }
1741}
1742
1743impl Session for SessionImpl {
1744    type Portal = Portal;
1745    type PreparedStatement = PrepareStatement;
1746    type ValuesStream = PgResponseStream;
1747
1748    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1749    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1750    async fn run_one_query(
1751        self: Arc<Self>,
1752        stmt: Statement,
1753        format: Format,
1754    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1755        let string = stmt.to_string();
1756        let sql_str = string.as_str();
1757        let sql: Arc<str> = Arc::from(sql_str);
1758        // The handle can be slow. Release potential large String early.
1759        drop(string);
1760        let rsp = handle(self, stmt, sql, vec![format]).await?;
1761        Ok(rsp)
1762    }
1763
1764    fn user_authenticator(&self) -> &UserAuthenticator {
1765        &self.user_authenticator
1766    }
1767
1768    fn id(&self) -> SessionId {
1769        self.id
1770    }
1771
1772    async fn parse(
1773        self: Arc<Self>,
1774        statement: Option<Statement>,
1775        params_types: Vec<Option<DataType>>,
1776    ) -> std::result::Result<PrepareStatement, BoxedError> {
1777        Ok(if let Some(statement) = statement {
1778            handle_parse(self, statement, params_types).await?
1779        } else {
1780            PrepareStatement::Empty
1781        })
1782    }
1783
1784    fn bind(
1785        self: Arc<Self>,
1786        prepare_statement: PrepareStatement,
1787        params: Vec<Option<Bytes>>,
1788        param_formats: Vec<Format>,
1789        result_formats: Vec<Format>,
1790    ) -> std::result::Result<Portal, BoxedError> {
1791        Ok(handle_bind(
1792            prepare_statement,
1793            params,
1794            param_formats,
1795            result_formats,
1796        )?)
1797    }
1798
1799    async fn execute(
1800        self: Arc<Self>,
1801        portal: Portal,
1802    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1803        let rsp = handle_execute(self, portal).await?;
1804        Ok(rsp)
1805    }
1806
1807    fn describe_statement(
1808        self: Arc<Self>,
1809        prepare_statement: PrepareStatement,
1810    ) -> std::result::Result<(Vec<DataType>, Vec<PgFieldDescriptor>), BoxedError> {
1811        Ok(match prepare_statement {
1812            PrepareStatement::Empty => (vec![], vec![]),
1813            PrepareStatement::Prepared(prepare_statement) => (
1814                prepare_statement.bound_result.param_types,
1815                infer(
1816                    Some(prepare_statement.bound_result.bound),
1817                    prepare_statement.statement,
1818                )?,
1819            ),
1820            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1821        })
1822    }
1823
1824    fn describe_portal(
1825        self: Arc<Self>,
1826        portal: Portal,
1827    ) -> std::result::Result<Vec<PgFieldDescriptor>, BoxedError> {
1828        match portal {
1829            Portal::Empty => Ok(vec![]),
1830            Portal::Portal(portal) => {
1831                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1832                let formats = FormatIterator::new(&portal.result_formats, columns.len())?;
1833                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1834                    if f == Format::Binary {
1835                        c.set_to_binary()
1836                    }
1837                });
1838                Ok(columns)
1839            }
1840            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1841        }
1842    }
1843
1844    fn get_config(&self, key: &str) -> std::result::Result<String, BoxedError> {
1845        self.config().get(key).map_err(Into::into)
1846    }
1847
1848    fn set_config(&self, key: &str, value: String) -> std::result::Result<String, BoxedError> {
1849        Self::set_config(self, key, value).map_err(Into::into)
1850    }
1851
1852    async fn next_notice(self: &Arc<Self>) -> String {
1853        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1854            .await
1855            .expect("notice channel should not be closed")
1856    }
1857
1858    fn transaction_status(&self) -> TransactionStatus {
1859        match &*self.txn.lock() {
1860            transaction::State::Initial | transaction::State::Implicit(_) => {
1861                TransactionStatus::Idle
1862            }
1863            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1864            // TODO: failed transaction
1865        }
1866    }
1867
1868    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1869    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1870        let exec_context = Arc::new(ExecContext {
1871            running_sql: sql,
1872            last_instant: Instant::now(),
1873            last_idle_instant: self.last_idle_instant.clone(),
1874        });
1875        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1876        // unset idle state, since there is a sql running
1877        *self.last_idle_instant.lock() = None;
1878        ExecContextGuard::new(exec_context)
1879    }
1880
1881    /// Check whether idle transaction timeout.
1882    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1883    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1884        // In transaction.
1885        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1886            let idle_in_transaction_session_timeout =
1887                self.config().idle_in_transaction_session_timeout() as u128;
1888            // Idle transaction timeout has been enabled.
1889            if idle_in_transaction_session_timeout != 0 {
1890                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1891                let guard = self.exec_context.lock();
1892                // No running sql i.e. idle
1893                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1894                    // Idle timeout.
1895                    if let Some(elapse_since_last_idle_instant) =
1896                        self.elapse_since_last_idle_instant()
1897                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1898                    {
1899                        return Err(PsqlError::IdleInTxnTimeout);
1900                    }
1901                }
1902            }
1903        }
1904        Ok(())
1905    }
1906}
1907
1908/// Returns row description of the statement
1909fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1910    match stmt {
1911        Statement::Query(_)
1912        | Statement::Insert { .. }
1913        | Statement::Delete { .. }
1914        | Statement::Update { .. }
1915        | Statement::FetchCursor { .. } => Ok(bound
1916            .unwrap()
1917            .output_fields()
1918            .iter()
1919            .map(to_pg_field)
1920            .collect()),
1921        Statement::ShowObjects {
1922            object: show_object,
1923            ..
1924        } => Ok(infer_show_object(&show_object)),
1925        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1926        Statement::ShowTransactionIsolationLevel => {
1927            let name = "transaction_isolation";
1928            Ok(infer_show_variable(name))
1929        }
1930        Statement::ShowVariable { variable } => {
1931            let name = &variable[0].real_value().to_lowercase();
1932            Ok(infer_show_variable(name))
1933        }
1934        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1935        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1936            "QUERY PLAN".to_owned(),
1937            DataType::Varchar.to_oid(),
1938            DataType::Varchar.type_len(),
1939        )]),
1940        _ => Ok(vec![]),
1941    }
1942}
1943
1944pub struct WorkerProcessId {
1945    pub worker_id: WorkerId,
1946    pub process_id: i32,
1947}
1948
1949impl WorkerProcessId {
1950    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1951        Self {
1952            worker_id,
1953            process_id,
1954        }
1955    }
1956}
1957
1958impl Display for WorkerProcessId {
1959    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1960        write!(f, "{}:{}", self.worker_id, self.process_id)
1961    }
1962}
1963
1964impl TryFrom<String> for WorkerProcessId {
1965    type Error = String;
1966
1967    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1968        const INVALID: &str = "invalid WorkerProcessId";
1969        let splits: Vec<&str> = worker_process_id.split(":").collect();
1970        if splits.len() != 2 {
1971            return Err(INVALID.to_owned());
1972        }
1973        let Ok(worker_id) = splits[0].parse::<u32>() else {
1974            return Err(INVALID.to_owned());
1975        };
1976        let Ok(process_id) = splits[1].parse::<i32>() else {
1977            return Err(INVALID.to_owned());
1978        };
1979        Ok(WorkerProcessId::new(worker_id.into(), process_id))
1980    }
1981}
1982
1983pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1984    let guard = sessions_map.read();
1985    if let Some(session) = guard.get(&session_id) {
1986        session.cancel_current_query();
1987        true
1988    } else {
1989        info!("Current session finished, ignoring cancel query request");
1990        false
1991    }
1992}
1993
1994pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1995    let guard = sessions_map.read();
1996    if let Some(session) = guard.get(&session_id) {
1997        session.cancel_current_creating_job();
1998        true
1999    } else {
2000        info!("Current session finished, ignoring cancel creating request");
2001        false
2002    }
2003}