risingwave_frontend/
session.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{Display, Formatter};
17use std::io::{Error, ErrorKind};
18use std::net::{IpAddr, Ipv4Addr, SocketAddr};
19use std::sync::atomic::{AtomicI32, Ordering};
20use std::sync::{Arc, Weak};
21use std::time::{Duration, Instant};
22
23use anyhow::anyhow;
24use bytes::Bytes;
25use either::Either;
26use itertools::Itertools;
27use parking_lot::{Mutex, RwLock, RwLockReadGuard};
28use pgwire::error::{PsqlError, PsqlResult};
29use pgwire::net::{Address, AddressRef};
30use pgwire::pg_field_descriptor::PgFieldDescriptor;
31use pgwire::pg_message::TransactionStatus;
32use pgwire::pg_response::{PgResponse, StatementType};
33use pgwire::pg_server::{
34    BoxedError, ExecContext, ExecContextGuard, Session, SessionId, SessionManager,
35    UserAuthenticator,
36};
37use pgwire::types::{Format, FormatIterator};
38use prometheus_http_query::Client as PrometheusClient;
39use rand::RngCore;
40use risingwave_batch::monitor::{BatchSpillMetrics, GLOBAL_BATCH_SPILL_METRICS};
41use risingwave_batch::spill::spill_op::SpillOp;
42use risingwave_batch::task::{ShutdownSender, ShutdownToken};
43use risingwave_batch::worker_manager::worker_node_manager::{
44    WorkerNodeManager, WorkerNodeManagerRef,
45};
46use risingwave_common::acl::AclMode;
47#[cfg(test)]
48use risingwave_common::catalog::{
49    DEFAULT_DATABASE_NAME, DEFAULT_SUPER_USER, DEFAULT_SUPER_USER_ID,
50};
51use risingwave_common::config::{
52    AuthMethod, BatchConfig, ConnectionType, FrontendConfig, MetaConfig, MetricLevel,
53    RpcClientConfig, StreamingConfig, UdfConfig, load_config,
54};
55use risingwave_common::id::WorkerId;
56use risingwave_common::memory::MemoryContext;
57use risingwave_common::secret::LocalSecretManager;
58use risingwave_common::session_config::{ConfigReporter, SessionConfig, VisibilityMode};
59use risingwave_common::system_param::local_manager::{
60    LocalSystemParamsManager, LocalSystemParamsManagerRef,
61};
62use risingwave_common::telemetry::manager::TelemetryManager;
63use risingwave_common::telemetry::telemetry_env_enabled;
64use risingwave_common::types::DataType;
65use risingwave_common::util::addr::HostAddr;
66use risingwave_common::util::cluster_limit;
67use risingwave_common::util::cluster_limit::ActorCountPerParallelism;
68use risingwave_common::util::iter_util::ZipEqFast;
69use risingwave_common::util::pretty_bytes::convert;
70use risingwave_common::util::runtime::BackgroundShutdownRuntime;
71use risingwave_common::{GIT_SHA, RW_VERSION};
72use risingwave_common_heap_profiling::HeapProfiler;
73use risingwave_common_service::{MetricsManager, ObserverManager};
74use risingwave_connector::source::monitor::{GLOBAL_SOURCE_METRICS, SourceMetrics};
75use risingwave_pb::common::WorkerType;
76use risingwave_pb::common::worker_node::Property as AddWorkerNodeProperty;
77use risingwave_pb::frontend_service::frontend_service_server::FrontendServiceServer;
78use risingwave_pb::health::health_server::HealthServer;
79use risingwave_pb::monitor_service::monitor_service_server::MonitorServiceServer;
80use risingwave_pb::user::auth_info::EncryptionType;
81use risingwave_rpc_client::{
82    ComputeClientPool, ComputeClientPoolRef, FrontendClientPool, FrontendClientPoolRef, MetaClient,
83    MonitorClientPool, MonitorClientPoolRef,
84};
85use risingwave_sqlparser::ast::{ObjectName, Statement};
86use risingwave_sqlparser::parser::Parser;
87use thiserror::Error;
88use thiserror_ext::AsReport;
89use tokio::runtime::Builder;
90use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
91use tokio::sync::oneshot::Sender;
92use tokio::sync::watch;
93use tokio::task::JoinHandle;
94use tracing::{error, info};
95
96use self::cursor_manager::CursorManager;
97use crate::binder::{Binder, BoundStatement, ResolveQualifiedNameError};
98use crate::catalog::catalog_service::{CatalogReader, CatalogWriter, CatalogWriterImpl};
99use crate::catalog::connection_catalog::ConnectionCatalog;
100use crate::catalog::root_catalog::{Catalog, SchemaPath};
101use crate::catalog::secret_catalog::SecretCatalog;
102use crate::catalog::source_catalog::SourceCatalog;
103use crate::catalog::subscription_catalog::SubscriptionCatalog;
104use crate::catalog::{
105    CatalogError, CatalogErrorInner, DatabaseId, OwnedByUserCatalog, SchemaId, TableId,
106    check_schema_writable,
107};
108use crate::error::{
109    ErrorCode, Result, RwError, bail_catalog_error, bail_permission_denied, bail_protocol_error,
110};
111use crate::handler::describe::infer_describe;
112use crate::handler::extended_handle::{
113    Portal, PrepareStatement, handle_bind, handle_execute, handle_parse,
114};
115use crate::handler::privilege::ObjectCheckItem;
116use crate::handler::show::{infer_show_create_object, infer_show_object};
117use crate::handler::util::to_pg_field;
118use crate::handler::variable::infer_show_variable;
119use crate::handler::{RwPgResponse, handle};
120use crate::health_service::HealthServiceImpl;
121use crate::meta_client::{FrontendMetaClient, FrontendMetaClientImpl};
122use crate::monitor::{CursorMetrics, FrontendMetrics, GLOBAL_FRONTEND_METRICS};
123use crate::observer::FrontendObserverNode;
124use crate::rpc::{FrontendServiceImpl, MonitorServiceImpl};
125use crate::scheduler::streaming_manager::{StreamingJobTracker, StreamingJobTrackerRef};
126use crate::scheduler::{
127    DistributedQueryMetrics, GLOBAL_DISTRIBUTED_QUERY_METRICS, HummockSnapshotManager,
128    HummockSnapshotManagerRef, QueryManager,
129};
130use crate::telemetry::FrontendTelemetryCreator;
131use crate::user::UserId;
132use crate::user::user_authentication::md5_hash_with_salt;
133use crate::user::user_manager::UserInfoManager;
134use crate::user::user_service::{UserInfoReader, UserInfoWriter, UserInfoWriterImpl};
135use crate::{FrontendOpts, PgResponseStream, TableCatalog};
136
137pub(crate) mod current;
138pub(crate) mod cursor_manager;
139pub(crate) mod transaction;
140
141/// The global environment for the frontend server.
142#[derive(Clone)]
143pub(crate) struct FrontendEnv {
144    // Different session may access catalog at the same time and catalog is protected by a
145    // RwLock.
146    meta_client: Arc<dyn FrontendMetaClient>,
147    catalog_writer: Arc<dyn CatalogWriter>,
148    catalog_reader: CatalogReader,
149    user_info_writer: Arc<dyn UserInfoWriter>,
150    user_info_reader: UserInfoReader,
151    worker_node_manager: WorkerNodeManagerRef,
152    query_manager: QueryManager,
153    hummock_snapshot_manager: HummockSnapshotManagerRef,
154    system_params_manager: LocalSystemParamsManagerRef,
155    session_params: Arc<RwLock<SessionConfig>>,
156
157    server_addr: HostAddr,
158    client_pool: ComputeClientPoolRef,
159    frontend_client_pool: FrontendClientPoolRef,
160    monitor_client_pool: MonitorClientPoolRef,
161
162    /// Each session is identified by (`process_id`,
163    /// `secret_key`). When Cancel Request received, find corresponding session and cancel all
164    /// running queries.
165    sessions_map: SessionMapRef,
166
167    pub frontend_metrics: Arc<FrontendMetrics>,
168
169    pub cursor_metrics: Arc<CursorMetrics>,
170
171    source_metrics: Arc<SourceMetrics>,
172
173    /// Batch spill metrics
174    spill_metrics: Arc<BatchSpillMetrics>,
175
176    batch_config: BatchConfig,
177    frontend_config: FrontendConfig,
178    #[expect(dead_code)]
179    meta_config: MetaConfig,
180    streaming_config: StreamingConfig,
181    udf_config: UdfConfig,
182
183    /// Track creating streaming jobs, used to cancel creating streaming job when cancel request
184    /// received.
185    creating_streaming_job_tracker: StreamingJobTrackerRef,
186
187    /// Runtime for compute intensive tasks in frontend, e.g. executors in local mode,
188    /// root stage in mpp mode.
189    compute_runtime: Arc<BackgroundShutdownRuntime>,
190
191    /// Memory context used for batch executors in frontend.
192    mem_context: MemoryContext,
193
194    /// address of the serverless backfill controller.
195    serverless_backfill_controller_addr: String,
196
197    /// Prometheus client for querying metrics.
198    prometheus_client: Option<PrometheusClient>,
199
200    /// The additional selector used when querying Prometheus.
201    prometheus_selector: String,
202}
203
204/// Session map identified by `(process_id, secret_key)`
205pub type SessionMapRef = Arc<RwLock<HashMap<(i32, i32), Arc<SessionImpl>>>>;
206
207/// The proportion of frontend memory used for batch processing.
208const FRONTEND_BATCH_MEMORY_PROPORTION: f64 = 0.5;
209
210impl FrontendEnv {
211    pub fn mock() -> Self {
212        use crate::test_utils::{MockCatalogWriter, MockFrontendMetaClient, MockUserInfoWriter};
213
214        let catalog = Arc::new(RwLock::new(Catalog::default()));
215        let meta_client = Arc::new(MockFrontendMetaClient {});
216        let hummock_snapshot_manager = Arc::new(HummockSnapshotManager::new(meta_client.clone()));
217        let catalog_writer = Arc::new(MockCatalogWriter::new(
218            catalog.clone(),
219            hummock_snapshot_manager.clone(),
220        ));
221        let catalog_reader = CatalogReader::new(catalog);
222        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
223        let user_info_writer = Arc::new(MockUserInfoWriter::new(user_info_manager.clone()));
224        let user_info_reader = UserInfoReader::new(user_info_manager);
225        let worker_node_manager = Arc::new(WorkerNodeManager::mock(vec![]));
226        let system_params_manager = Arc::new(LocalSystemParamsManager::for_test());
227        let compute_client_pool = Arc::new(ComputeClientPool::for_test());
228        let frontend_client_pool = Arc::new(FrontendClientPool::for_test());
229        let monitor_client_pool = Arc::new(MonitorClientPool::for_test());
230        let query_manager = QueryManager::new(
231            worker_node_manager.clone(),
232            compute_client_pool,
233            catalog_reader.clone(),
234            Arc::new(DistributedQueryMetrics::for_test()),
235            None,
236            None,
237        );
238        let server_addr = HostAddr::try_from("127.0.0.1:4565").unwrap();
239        let client_pool = Arc::new(ComputeClientPool::for_test());
240        let creating_streaming_tracker = StreamingJobTracker::new(meta_client.clone());
241        let runtime = {
242            let mut builder = Builder::new_multi_thread();
243            if let Some(frontend_compute_runtime_worker_threads) =
244                load_config("", FrontendOpts::default())
245                    .batch
246                    .frontend_compute_runtime_worker_threads
247            {
248                builder.worker_threads(frontend_compute_runtime_worker_threads);
249            }
250            builder
251                .thread_name("rw-batch-local")
252                .enable_all()
253                .build()
254                .unwrap()
255        };
256        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
257        let sessions_map = Arc::new(RwLock::new(HashMap::new()));
258        Self {
259            meta_client,
260            catalog_writer,
261            catalog_reader,
262            user_info_writer,
263            user_info_reader,
264            worker_node_manager,
265            query_manager,
266            hummock_snapshot_manager,
267            system_params_manager,
268            session_params: Default::default(),
269            server_addr,
270            client_pool,
271            frontend_client_pool,
272            monitor_client_pool,
273            sessions_map,
274            frontend_metrics: Arc::new(FrontendMetrics::for_test()),
275            cursor_metrics: Arc::new(CursorMetrics::for_test()),
276            batch_config: BatchConfig::default(),
277            frontend_config: FrontendConfig::default(),
278            meta_config: MetaConfig::default(),
279            streaming_config: StreamingConfig::default(),
280            udf_config: UdfConfig::default(),
281            source_metrics: Arc::new(SourceMetrics::default()),
282            spill_metrics: BatchSpillMetrics::for_test(),
283            creating_streaming_job_tracker: Arc::new(creating_streaming_tracker),
284            compute_runtime,
285            mem_context: MemoryContext::none(),
286            serverless_backfill_controller_addr: Default::default(),
287            prometheus_client: None,
288            prometheus_selector: String::new(),
289        }
290    }
291
292    pub async fn init(opts: FrontendOpts) -> Result<(Self, Vec<JoinHandle<()>>, Vec<Sender<()>>)> {
293        let config = load_config(&opts.config_path, &opts);
294        info!("Starting frontend node");
295        info!("> config: {:?}", config);
296        info!(
297            "> debug assertions: {}",
298            if cfg!(debug_assertions) { "on" } else { "off" }
299        );
300        info!("> version: {} ({})", RW_VERSION, GIT_SHA);
301
302        let frontend_address: HostAddr = opts
303            .advertise_addr
304            .as_ref()
305            .unwrap_or_else(|| {
306                tracing::warn!("advertise addr is not specified, defaulting to listen_addr");
307                &opts.listen_addr
308            })
309            .parse()
310            .unwrap();
311        info!("advertise addr is {}", frontend_address);
312
313        let rpc_addr: HostAddr = opts.frontend_rpc_listener_addr.parse().unwrap();
314        let internal_rpc_host_addr = HostAddr {
315            // Use the host of advertise address for the frontend rpc address.
316            host: frontend_address.host.clone(),
317            port: rpc_addr.port,
318        };
319        // Register in meta by calling `AddWorkerNode` RPC.
320        let (meta_client, system_params_reader) = MetaClient::register_new(
321            opts.meta_addr,
322            WorkerType::Frontend,
323            &frontend_address,
324            AddWorkerNodeProperty {
325                internal_rpc_host_addr: internal_rpc_host_addr.to_string(),
326                ..Default::default()
327            },
328            Arc::new(config.meta.clone()),
329        )
330        .await;
331
332        let worker_id = meta_client.worker_id();
333        info!("Assigned worker node id {}", worker_id);
334
335        let (heartbeat_join_handle, heartbeat_shutdown_sender) = MetaClient::start_heartbeat_loop(
336            meta_client.clone(),
337            Duration::from_millis(config.server.heartbeat_interval_ms as u64),
338        );
339        let mut join_handles = vec![heartbeat_join_handle];
340        let mut shutdown_senders = vec![heartbeat_shutdown_sender];
341
342        let frontend_meta_client = Arc::new(FrontendMetaClientImpl(meta_client.clone()));
343        let hummock_snapshot_manager =
344            Arc::new(HummockSnapshotManager::new(frontend_meta_client.clone()));
345
346        let (catalog_updated_tx, catalog_updated_rx) = watch::channel(0);
347        let catalog = Arc::new(RwLock::new(Catalog::default()));
348        let catalog_writer = Arc::new(CatalogWriterImpl::new(
349            meta_client.clone(),
350            catalog_updated_rx.clone(),
351            hummock_snapshot_manager.clone(),
352        ));
353        let catalog_reader = CatalogReader::new(catalog.clone());
354
355        let worker_node_manager = Arc::new(WorkerNodeManager::new());
356
357        let compute_client_pool = Arc::new(ComputeClientPool::new(
358            config.batch_exchange_connection_pool_size(),
359            config.batch.developer.compute_client_config.clone(),
360        ));
361        let frontend_client_pool = Arc::new(FrontendClientPool::new(
362            1,
363            config.batch.developer.frontend_client_config.clone(),
364        ));
365        let monitor_client_pool = Arc::new(MonitorClientPool::new(1, RpcClientConfig::default()));
366        let query_manager = QueryManager::new(
367            worker_node_manager.clone(),
368            compute_client_pool.clone(),
369            catalog_reader.clone(),
370            Arc::new(GLOBAL_DISTRIBUTED_QUERY_METRICS.clone()),
371            config.batch.distributed_query_limit,
372            config.batch.max_batch_queries_per_frontend_node,
373        );
374
375        let user_info_manager = Arc::new(RwLock::new(UserInfoManager::default()));
376        let user_info_reader = UserInfoReader::new(user_info_manager.clone());
377        let user_info_writer = Arc::new(UserInfoWriterImpl::new(
378            meta_client.clone(),
379            catalog_updated_rx,
380        ));
381
382        let system_params_manager =
383            Arc::new(LocalSystemParamsManager::new(system_params_reader.clone()));
384
385        LocalSecretManager::init(
386            opts.temp_secret_file_dir,
387            meta_client.cluster_id().to_owned(),
388            worker_id,
389        );
390
391        // This `session_params` should be initialized during the initial notification in `observer_manager`
392        let session_params = Arc::new(RwLock::new(SessionConfig::default()));
393        let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new()));
394        let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone()));
395
396        let frontend_observer_node = FrontendObserverNode::new(
397            worker_node_manager.clone(),
398            catalog,
399            catalog_updated_tx,
400            user_info_manager,
401            hummock_snapshot_manager.clone(),
402            system_params_manager.clone(),
403            session_params.clone(),
404            compute_client_pool.clone(),
405        );
406        let observer_manager =
407            ObserverManager::new_with_meta_client(meta_client.clone(), frontend_observer_node)
408                .await;
409        let observer_join_handle = observer_manager.start().await;
410        join_handles.push(observer_join_handle);
411
412        meta_client.activate(&frontend_address).await?;
413
414        let frontend_metrics = Arc::new(GLOBAL_FRONTEND_METRICS.clone());
415        let source_metrics = Arc::new(GLOBAL_SOURCE_METRICS.clone());
416        let spill_metrics = Arc::new(GLOBAL_BATCH_SPILL_METRICS.clone());
417
418        if config.server.metrics_level > MetricLevel::Disabled {
419            MetricsManager::boot_metrics_service(opts.prometheus_listener_addr.clone());
420        }
421
422        let health_srv = HealthServiceImpl::new();
423        let frontend_srv = FrontendServiceImpl::new(sessions_map.clone());
424        let monitor_srv = MonitorServiceImpl::new(config.server.clone());
425        let frontend_rpc_addr = opts.frontend_rpc_listener_addr.parse().unwrap();
426
427        let telemetry_manager = TelemetryManager::new(
428            Arc::new(meta_client.clone()),
429            Arc::new(FrontendTelemetryCreator::new()),
430        );
431
432        // if the toml config file or env variable disables telemetry, do not watch system params
433        // change because if any of configs disable telemetry, we should never start it
434        if config.server.telemetry_enabled && telemetry_env_enabled() {
435            let (join_handle, shutdown_sender) = telemetry_manager.start().await;
436            join_handles.push(join_handle);
437            shutdown_senders.push(shutdown_sender);
438        } else {
439            tracing::info!("Telemetry didn't start due to config");
440        }
441
442        tokio::spawn(async move {
443            tonic::transport::Server::builder()
444                .add_service(HealthServer::new(health_srv))
445                .add_service(FrontendServiceServer::new(frontend_srv))
446                .add_service(MonitorServiceServer::new(monitor_srv))
447                .serve(frontend_rpc_addr)
448                .await
449                .unwrap();
450        });
451        info!(
452            "Health Check RPC Listener is set up on {}",
453            opts.frontend_rpc_listener_addr.clone()
454        );
455
456        let creating_streaming_job_tracker =
457            Arc::new(StreamingJobTracker::new(frontend_meta_client.clone()));
458
459        let runtime = {
460            let mut builder = Builder::new_multi_thread();
461            if let Some(frontend_compute_runtime_worker_threads) =
462                config.batch.frontend_compute_runtime_worker_threads
463            {
464                builder.worker_threads(frontend_compute_runtime_worker_threads);
465            }
466            builder
467                .thread_name("rw-batch-local")
468                .enable_all()
469                .build()
470                .unwrap()
471        };
472        let compute_runtime = Arc::new(BackgroundShutdownRuntime::from(runtime));
473
474        let sessions = sessions_map.clone();
475        // Idle transaction background monitor
476        let join_handle = tokio::spawn(async move {
477            let mut check_idle_txn_interval =
478                tokio::time::interval(core::time::Duration::from_secs(10));
479            check_idle_txn_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
480            check_idle_txn_interval.reset();
481            loop {
482                check_idle_txn_interval.tick().await;
483                sessions.read().values().for_each(|session| {
484                    let _ = session.check_idle_in_transaction_timeout();
485                })
486            }
487        });
488        join_handles.push(join_handle);
489
490        // Clean up the spill directory.
491        #[cfg(not(madsim))]
492        if config.batch.enable_spill {
493            SpillOp::clean_spill_directory()
494                .await
495                .map_err(|err| anyhow!(err))?;
496        }
497
498        let total_memory_bytes = opts.frontend_total_memory_bytes;
499        let heap_profiler =
500            HeapProfiler::new(total_memory_bytes, config.server.heap_profiling.clone());
501        // Run a background heap profiler
502        heap_profiler.start();
503
504        let batch_memory_limit = total_memory_bytes as f64 * FRONTEND_BATCH_MEMORY_PROPORTION;
505        let mem_context = MemoryContext::root(
506            frontend_metrics.batch_total_mem.clone(),
507            batch_memory_limit as u64,
508        );
509
510        // Initialize Prometheus client if endpoint is provided
511        let prometheus_client = if let Some(ref endpoint) = opts.prometheus_endpoint {
512            match PrometheusClient::try_from(endpoint.as_str()) {
513                Ok(client) => {
514                    info!("Prometheus client initialized with endpoint: {}", endpoint);
515                    Some(client)
516                }
517                Err(e) => {
518                    error!(
519                        error = %e.as_report(),
520                        "failed to initialize Prometheus client",
521                    );
522                    None
523                }
524            }
525        } else {
526            None
527        };
528
529        // Initialize Prometheus selector
530        let prometheus_selector = opts.prometheus_selector.unwrap_or_default();
531
532        info!(
533            "Frontend  total_memory: {} batch_memory: {}",
534            convert(total_memory_bytes as _),
535            convert(batch_memory_limit as _),
536        );
537
538        Ok((
539            Self {
540                catalog_reader,
541                catalog_writer,
542                user_info_reader,
543                user_info_writer,
544                worker_node_manager,
545                meta_client: frontend_meta_client,
546                query_manager,
547                hummock_snapshot_manager,
548                system_params_manager,
549                session_params,
550                server_addr: frontend_address,
551                client_pool: compute_client_pool,
552                frontend_client_pool,
553                monitor_client_pool,
554                frontend_metrics,
555                cursor_metrics,
556                spill_metrics,
557                sessions_map,
558                batch_config: config.batch,
559                frontend_config: config.frontend,
560                meta_config: config.meta,
561                streaming_config: config.streaming,
562                serverless_backfill_controller_addr: opts.serverless_backfill_controller_addr,
563                udf_config: config.udf,
564                source_metrics,
565                creating_streaming_job_tracker,
566                compute_runtime,
567                mem_context,
568                prometheus_client,
569                prometheus_selector,
570            },
571            join_handles,
572            shutdown_senders,
573        ))
574    }
575
576    /// Get a reference to the frontend env's catalog writer.
577    ///
578    /// This method is intentionally private, and a write guard is required for the caller to
579    /// prove that the write operations are permitted in the current transaction.
580    fn catalog_writer(&self, _guard: transaction::WriteGuard) -> &dyn CatalogWriter {
581        &*self.catalog_writer
582    }
583
584    /// Get a reference to the frontend env's catalog reader.
585    pub fn catalog_reader(&self) -> &CatalogReader {
586        &self.catalog_reader
587    }
588
589    /// Get a reference to the frontend env's user info writer.
590    ///
591    /// This method is intentionally private, and a write guard is required for the caller to
592    /// prove that the write operations are permitted in the current transaction.
593    fn user_info_writer(&self, _guard: transaction::WriteGuard) -> &dyn UserInfoWriter {
594        &*self.user_info_writer
595    }
596
597    /// Get a reference to the frontend env's user info reader.
598    pub fn user_info_reader(&self) -> &UserInfoReader {
599        &self.user_info_reader
600    }
601
602    pub fn worker_node_manager_ref(&self) -> WorkerNodeManagerRef {
603        self.worker_node_manager.clone()
604    }
605
606    pub fn meta_client(&self) -> &dyn FrontendMetaClient {
607        &*self.meta_client
608    }
609
610    pub fn meta_client_ref(&self) -> Arc<dyn FrontendMetaClient> {
611        self.meta_client.clone()
612    }
613
614    pub fn query_manager(&self) -> &QueryManager {
615        &self.query_manager
616    }
617
618    pub fn hummock_snapshot_manager(&self) -> &HummockSnapshotManagerRef {
619        &self.hummock_snapshot_manager
620    }
621
622    pub fn system_params_manager(&self) -> &LocalSystemParamsManagerRef {
623        &self.system_params_manager
624    }
625
626    pub fn session_params_snapshot(&self) -> SessionConfig {
627        self.session_params.read_recursive().clone()
628    }
629
630    pub fn sbc_address(&self) -> &String {
631        &self.serverless_backfill_controller_addr
632    }
633
634    /// Get a reference to the Prometheus client if available.
635    pub fn prometheus_client(&self) -> Option<&PrometheusClient> {
636        self.prometheus_client.as_ref()
637    }
638
639    /// Get the Prometheus selector string.
640    pub fn prometheus_selector(&self) -> &str {
641        &self.prometheus_selector
642    }
643
644    pub fn server_address(&self) -> &HostAddr {
645        &self.server_addr
646    }
647
648    pub fn client_pool(&self) -> ComputeClientPoolRef {
649        self.client_pool.clone()
650    }
651
652    pub fn frontend_client_pool(&self) -> FrontendClientPoolRef {
653        self.frontend_client_pool.clone()
654    }
655
656    pub fn monitor_client_pool(&self) -> MonitorClientPoolRef {
657        self.monitor_client_pool.clone()
658    }
659
660    pub fn batch_config(&self) -> &BatchConfig {
661        &self.batch_config
662    }
663
664    pub fn frontend_config(&self) -> &FrontendConfig {
665        &self.frontend_config
666    }
667
668    pub fn streaming_config(&self) -> &StreamingConfig {
669        &self.streaming_config
670    }
671
672    pub fn udf_config(&self) -> &UdfConfig {
673        &self.udf_config
674    }
675
676    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
677        self.source_metrics.clone()
678    }
679
680    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
681        self.spill_metrics.clone()
682    }
683
684    pub fn creating_streaming_job_tracker(&self) -> &StreamingJobTrackerRef {
685        &self.creating_streaming_job_tracker
686    }
687
688    pub fn sessions_map(&self) -> &SessionMapRef {
689        &self.sessions_map
690    }
691
692    pub fn compute_runtime(&self) -> Arc<BackgroundShutdownRuntime> {
693        self.compute_runtime.clone()
694    }
695
696    /// Cancel queries (i.e. batch queries) in session.
697    /// If the session exists return true, otherwise, return false.
698    pub fn cancel_queries_in_session(&self, session_id: SessionId) -> bool {
699        cancel_queries_in_session(session_id, self.sessions_map.clone())
700    }
701
702    /// Cancel creating jobs (i.e. streaming queries) in session.
703    /// If the session exists return true, otherwise, return false.
704    pub fn cancel_creating_jobs_in_session(&self, session_id: SessionId) -> bool {
705        cancel_creating_jobs_in_session(session_id, self.sessions_map.clone())
706    }
707
708    pub fn mem_context(&self) -> MemoryContext {
709        self.mem_context.clone()
710    }
711}
712
713#[derive(Clone)]
714pub struct AuthContext {
715    pub database: String,
716    pub user_name: String,
717    pub user_id: UserId,
718}
719
720impl AuthContext {
721    pub fn new(database: String, user_name: String, user_id: UserId) -> Self {
722        Self {
723            database,
724            user_name,
725            user_id,
726        }
727    }
728}
729pub struct SessionImpl {
730    env: FrontendEnv,
731    auth_context: Arc<RwLock<AuthContext>>,
732    /// Used for user authentication.
733    user_authenticator: UserAuthenticator,
734    /// Stores the value of configurations.
735    config_map: Arc<RwLock<SessionConfig>>,
736
737    /// Channel sender for frontend handler to send notices.
738    notice_tx: UnboundedSender<String>,
739    /// Channel receiver for pgwire to take notices and send to clients.
740    notice_rx: Mutex<UnboundedReceiver<String>>,
741
742    /// Identified by `process_id`, `secret_key`. Corresponds to `SessionManager`.
743    id: (i32, i32),
744
745    /// Client address
746    peer_addr: AddressRef,
747
748    /// Transaction state.
749    /// TODO: get rid of the `Mutex` here as a workaround if the `Send` requirement of
750    /// async functions, there should actually be no contention.
751    txn: Arc<Mutex<transaction::State>>,
752
753    /// Query cancel flag.
754    /// This flag is set only when current query is executed in local mode, and used to cancel
755    /// local query.
756    current_query_cancel_flag: Mutex<Option<ShutdownSender>>,
757
758    /// execution context represents the lifetime of a running SQL in the current session
759    exec_context: Mutex<Option<Weak<ExecContext>>>,
760
761    /// Last idle instant
762    last_idle_instant: Arc<Mutex<Option<Instant>>>,
763
764    cursor_manager: Arc<CursorManager>,
765
766    /// temporary sources for the current session
767    temporary_source_manager: Arc<Mutex<TemporarySourceManager>>,
768
769    /// staging catalogs for the current session
770    staging_catalog_manager: Arc<Mutex<StagingCatalogManager>>,
771}
772
773/// If TEMPORARY or TEMP is specified, the source is created as a temporary source.
774/// Temporary sources are automatically dropped at the end of a session
775/// Temporary sources are expected to be selected by batch queries, not streaming queries.
776/// Temporary sources currently are only used by cloud portal to preview the data during table and
777/// source creation, so it is a internal feature and not exposed to users.
778/// The current PR supports temporary source with minimum effort,
779/// so we don't care about the database name and schema name, but only care about the source name.
780/// Temporary sources can only be shown via `show sources` command but not other system tables.
781#[derive(Default, Clone)]
782pub struct TemporarySourceManager {
783    sources: HashMap<String, SourceCatalog>,
784}
785
786impl TemporarySourceManager {
787    pub fn new() -> Self {
788        Self {
789            sources: HashMap::new(),
790        }
791    }
792
793    pub fn create_source(&mut self, name: String, source: SourceCatalog) {
794        self.sources.insert(name, source);
795    }
796
797    pub fn drop_source(&mut self, name: &str) {
798        self.sources.remove(name);
799    }
800
801    pub fn get_source(&self, name: &str) -> Option<&SourceCatalog> {
802        self.sources.get(name)
803    }
804
805    pub fn keys(&self) -> Vec<String> {
806        self.sources.keys().cloned().collect()
807    }
808}
809
810/// Staging catalog manager is used to manage the tables creating in the current session.
811#[derive(Default, Clone)]
812pub struct StagingCatalogManager {
813    // staging tables creating in the current session.
814    tables: HashMap<String, TableCatalog>,
815}
816
817impl StagingCatalogManager {
818    pub fn new() -> Self {
819        Self {
820            tables: HashMap::new(),
821        }
822    }
823
824    pub fn create_table(&mut self, name: String, table: TableCatalog) {
825        self.tables.insert(name, table);
826    }
827
828    pub fn drop_table(&mut self, name: &str) {
829        self.tables.remove(name);
830    }
831
832    pub fn get_table(&self, name: &str) -> Option<&TableCatalog> {
833        self.tables.get(name)
834    }
835}
836
837#[derive(Error, Debug)]
838pub enum CheckRelationError {
839    #[error("{0}")]
840    Resolve(#[from] ResolveQualifiedNameError),
841    #[error("{0}")]
842    Catalog(#[from] CatalogError),
843}
844
845impl From<CheckRelationError> for RwError {
846    fn from(e: CheckRelationError) -> Self {
847        match e {
848            CheckRelationError::Resolve(e) => e.into(),
849            CheckRelationError::Catalog(e) => e.into(),
850        }
851    }
852}
853
854impl SessionImpl {
855    pub(crate) fn new(
856        env: FrontendEnv,
857        auth_context: AuthContext,
858        user_authenticator: UserAuthenticator,
859        id: SessionId,
860        peer_addr: AddressRef,
861        session_config: SessionConfig,
862    ) -> Self {
863        let cursor_metrics = env.cursor_metrics.clone();
864        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
865
866        Self {
867            env,
868            auth_context: Arc::new(RwLock::new(auth_context)),
869            user_authenticator,
870            config_map: Arc::new(RwLock::new(session_config)),
871            id,
872            peer_addr,
873            txn: Default::default(),
874            current_query_cancel_flag: Mutex::new(None),
875            notice_tx,
876            notice_rx: Mutex::new(notice_rx),
877            exec_context: Mutex::new(None),
878            last_idle_instant: Default::default(),
879            cursor_manager: Arc::new(CursorManager::new(cursor_metrics)),
880            temporary_source_manager: Default::default(),
881            staging_catalog_manager: Default::default(),
882        }
883    }
884
885    #[cfg(test)]
886    pub fn mock() -> Self {
887        let env = FrontendEnv::mock();
888        let (notice_tx, notice_rx) = mpsc::unbounded_channel();
889
890        Self {
891            env: FrontendEnv::mock(),
892            auth_context: Arc::new(RwLock::new(AuthContext::new(
893                DEFAULT_DATABASE_NAME.to_owned(),
894                DEFAULT_SUPER_USER.to_owned(),
895                DEFAULT_SUPER_USER_ID,
896            ))),
897            user_authenticator: UserAuthenticator::None,
898            config_map: Default::default(),
899            // Mock session use non-sense id.
900            id: (0, 0),
901            txn: Default::default(),
902            current_query_cancel_flag: Mutex::new(None),
903            notice_tx,
904            notice_rx: Mutex::new(notice_rx),
905            exec_context: Mutex::new(None),
906            peer_addr: Address::Tcp(SocketAddr::new(
907                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
908                8080,
909            ))
910            .into(),
911            last_idle_instant: Default::default(),
912            cursor_manager: Arc::new(CursorManager::new(env.cursor_metrics)),
913            temporary_source_manager: Default::default(),
914            staging_catalog_manager: Default::default(),
915        }
916    }
917
918    pub(crate) fn env(&self) -> &FrontendEnv {
919        &self.env
920    }
921
922    pub fn auth_context(&self) -> Arc<AuthContext> {
923        let ctx = self.auth_context.read();
924        Arc::new(ctx.clone())
925    }
926
927    pub fn database(&self) -> String {
928        self.auth_context.read().database.clone()
929    }
930
931    pub fn database_id(&self) -> DatabaseId {
932        let db_name = self.database();
933        self.env
934            .catalog_reader()
935            .read_guard()
936            .get_database_by_name(&db_name)
937            .map(|db| db.id())
938            .expect("session database not found")
939    }
940
941    pub fn user_name(&self) -> String {
942        self.auth_context.read().user_name.clone()
943    }
944
945    pub fn user_id(&self) -> UserId {
946        self.auth_context.read().user_id
947    }
948
949    pub fn update_database(&self, database: String) {
950        self.auth_context.write().database = database;
951    }
952
953    pub fn shared_config(&self) -> Arc<RwLock<SessionConfig>> {
954        Arc::clone(&self.config_map)
955    }
956
957    pub fn config(&self) -> RwLockReadGuard<'_, SessionConfig> {
958        self.config_map.read()
959    }
960
961    pub fn set_config(&self, key: &str, value: String) -> Result<String> {
962        self.config_map
963            .write()
964            .set(key, value, &mut ())
965            .map_err(Into::into)
966    }
967
968    pub fn reset_config(&self, key: &str) -> Result<String> {
969        self.config_map
970            .write()
971            .reset(key, &mut ())
972            .map_err(Into::into)
973    }
974
975    pub fn set_config_report(
976        &self,
977        key: &str,
978        value: Option<String>,
979        mut reporter: impl ConfigReporter,
980    ) -> Result<String> {
981        if let Some(value) = value {
982            self.config_map
983                .write()
984                .set(key, value, &mut reporter)
985                .map_err(Into::into)
986        } else {
987            self.config_map
988                .write()
989                .reset(key, &mut reporter)
990                .map_err(Into::into)
991        }
992    }
993
994    pub fn session_id(&self) -> SessionId {
995        self.id
996    }
997
998    pub fn running_sql(&self) -> Option<Arc<str>> {
999        self.exec_context
1000            .lock()
1001            .as_ref()
1002            .and_then(|weak| weak.upgrade())
1003            .map(|context| context.running_sql.clone())
1004    }
1005
1006    pub fn get_cursor_manager(&self) -> Arc<CursorManager> {
1007        self.cursor_manager.clone()
1008    }
1009
1010    pub fn peer_addr(&self) -> &Address {
1011        &self.peer_addr
1012    }
1013
1014    pub fn elapse_since_running_sql(&self) -> Option<u128> {
1015        self.exec_context
1016            .lock()
1017            .as_ref()
1018            .and_then(|weak| weak.upgrade())
1019            .map(|context| context.last_instant.elapsed().as_millis())
1020    }
1021
1022    pub fn elapse_since_last_idle_instant(&self) -> Option<u128> {
1023        self.last_idle_instant
1024            .lock()
1025            .as_ref()
1026            .map(|x| x.elapsed().as_millis())
1027    }
1028
1029    pub fn check_relation_name_duplicated(
1030        &self,
1031        name: ObjectName,
1032        stmt_type: StatementType,
1033        if_not_exists: bool,
1034    ) -> std::result::Result<Either<(), RwPgResponse>, CheckRelationError> {
1035        let db_name = &self.database();
1036        let catalog_reader = self.env().catalog_reader().read_guard();
1037        let (schema_name, relation_name) = {
1038            let (schema_name, relation_name) =
1039                Binder::resolve_schema_qualified_name(db_name, &name)?;
1040            let search_path = self.config().search_path();
1041            let user_name = &self.user_name();
1042            let schema_name = match schema_name {
1043                Some(schema_name) => schema_name,
1044                None => catalog_reader
1045                    .first_valid_schema(db_name, &search_path, user_name)?
1046                    .name(),
1047            };
1048            (schema_name, relation_name)
1049        };
1050        match catalog_reader.check_relation_name_duplicated(db_name, &schema_name, &relation_name) {
1051            Err(e) if if_not_exists => {
1052                if let CatalogErrorInner::Duplicated {
1053                    name,
1054                    under_creation,
1055                    ..
1056                } = e.inner()
1057                {
1058                    // If relation is created, return directly.
1059                    // Otherwise, the job status is `is_creating`. Since frontend receives the catalog asynchronously, we can't
1060                    // determine the real status of the meta at this time. We regard it as `not_exists` and delay the check to meta.
1061                    // Only the type in StreamingJob (defined in streaming_job.rs) and Subscription may be `is_creating`.
1062                    if !*under_creation {
1063                        Ok(Either::Right(
1064                            PgResponse::builder(stmt_type)
1065                                .notice(format!("relation \"{}\" already exists, skipping", name))
1066                                .into(),
1067                        ))
1068                    } else if stmt_type == StatementType::CREATE_SUBSCRIPTION {
1069                        // For now, when a Subscription is creating, we return directly with an additional message.
1070                        // TODO: Subscription should also be processed in the same way as StreamingJob.
1071                        Ok(Either::Right(
1072                            PgResponse::builder(stmt_type)
1073                                .notice(format!(
1074                                    "relation \"{}\" already exists but still creating, skipping",
1075                                    name
1076                                ))
1077                                .into(),
1078                        ))
1079                    } else {
1080                        Ok(Either::Left(()))
1081                    }
1082                } else {
1083                    Err(e.into())
1084                }
1085            }
1086            Err(e) => Err(e.into()),
1087            Ok(_) => Ok(Either::Left(())),
1088        }
1089    }
1090
1091    pub fn check_secret_name_duplicated(&self, name: ObjectName) -> Result<()> {
1092        let db_name = &self.database();
1093        let catalog_reader = self.env().catalog_reader().read_guard();
1094        let (schema_name, secret_name) = {
1095            let (schema_name, secret_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1096            let search_path = self.config().search_path();
1097            let user_name = &self.user_name();
1098            let schema_name = match schema_name {
1099                Some(schema_name) => schema_name,
1100                None => catalog_reader
1101                    .first_valid_schema(db_name, &search_path, user_name)?
1102                    .name(),
1103            };
1104            (schema_name, secret_name)
1105        };
1106        catalog_reader
1107            .check_secret_name_duplicated(db_name, &schema_name, &secret_name)
1108            .map_err(RwError::from)
1109    }
1110
1111    pub fn check_connection_name_duplicated(&self, name: ObjectName) -> Result<()> {
1112        let db_name = &self.database();
1113        let catalog_reader = self.env().catalog_reader().read_guard();
1114        let (schema_name, connection_name) = {
1115            let (schema_name, connection_name) =
1116                Binder::resolve_schema_qualified_name(db_name, &name)?;
1117            let search_path = self.config().search_path();
1118            let user_name = &self.user_name();
1119            let schema_name = match schema_name {
1120                Some(schema_name) => schema_name,
1121                None => catalog_reader
1122                    .first_valid_schema(db_name, &search_path, user_name)?
1123                    .name(),
1124            };
1125            (schema_name, connection_name)
1126        };
1127        catalog_reader
1128            .check_connection_name_duplicated(db_name, &schema_name, &connection_name)
1129            .map_err(RwError::from)
1130    }
1131
1132    pub fn check_function_name_duplicated(
1133        &self,
1134        stmt_type: StatementType,
1135        name: ObjectName,
1136        arg_types: &[DataType],
1137        if_not_exists: bool,
1138    ) -> Result<Either<(), RwPgResponse>> {
1139        let db_name = &self.database();
1140        let (schema_name, function_name) = Binder::resolve_schema_qualified_name(db_name, &name)?;
1141        let (database_id, schema_id) = self.get_database_and_schema_id_for_create(schema_name)?;
1142
1143        let catalog_reader = self.env().catalog_reader().read_guard();
1144        if catalog_reader
1145            .get_schema_by_id(database_id, schema_id)?
1146            .get_function_by_name_args(&function_name, arg_types)
1147            .is_some()
1148        {
1149            let full_name = format!(
1150                "{function_name}({})",
1151                arg_types.iter().map(|t| t.to_string()).join(",")
1152            );
1153            if if_not_exists {
1154                Ok(Either::Right(
1155                    PgResponse::builder(stmt_type)
1156                        .notice(format!(
1157                            "function \"{}\" already exists, skipping",
1158                            full_name
1159                        ))
1160                        .into(),
1161                ))
1162            } else {
1163                Err(CatalogError::duplicated("function", full_name).into())
1164            }
1165        } else {
1166            Ok(Either::Left(()))
1167        }
1168    }
1169
1170    /// Also check if the user has the privilege to create in the schema.
1171    pub fn get_database_and_schema_id_for_create(
1172        &self,
1173        schema_name: Option<String>,
1174    ) -> Result<(DatabaseId, SchemaId)> {
1175        let db_name = &self.database();
1176
1177        let search_path = self.config().search_path();
1178        let user_name = &self.user_name();
1179
1180        let catalog_reader = self.env().catalog_reader().read_guard();
1181        let schema = match schema_name {
1182            Some(schema_name) => catalog_reader.get_schema_by_name(db_name, &schema_name)?,
1183            None => catalog_reader.first_valid_schema(db_name, &search_path, user_name)?,
1184        };
1185        let schema_name = schema.name();
1186
1187        check_schema_writable(&schema_name)?;
1188        self.check_privileges(&[ObjectCheckItem::new(
1189            schema.owner(),
1190            AclMode::Create,
1191            schema_name,
1192            schema.id(),
1193        )])?;
1194
1195        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1196        Ok((db_id, schema.id()))
1197    }
1198
1199    pub fn get_connection_by_name(
1200        &self,
1201        schema_name: Option<String>,
1202        connection_name: &str,
1203    ) -> Result<Arc<ConnectionCatalog>> {
1204        let db_name = &self.database();
1205        let search_path = self.config().search_path();
1206        let user_name = &self.user_name();
1207
1208        let catalog_reader = self.env().catalog_reader().read_guard();
1209        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1210        let (connection, _) =
1211            catalog_reader.get_connection_by_name(db_name, schema_path, connection_name)?;
1212
1213        self.check_privileges(&[ObjectCheckItem::new(
1214            connection.owner(),
1215            AclMode::Usage,
1216            connection.name.clone(),
1217            connection.id,
1218        )])?;
1219
1220        Ok(connection.clone())
1221    }
1222
1223    pub fn get_subscription_by_schema_id_name(
1224        &self,
1225        schema_id: SchemaId,
1226        subscription_name: &str,
1227    ) -> Result<Arc<SubscriptionCatalog>> {
1228        let db_name = &self.database();
1229
1230        let catalog_reader = self.env().catalog_reader().read_guard();
1231        let db_id = catalog_reader.get_database_by_name(db_name)?.id();
1232        let schema = catalog_reader.get_schema_by_id(db_id, schema_id)?;
1233        let subscription = schema
1234            .get_subscription_by_name(subscription_name)
1235            .ok_or_else(|| {
1236                RwError::from(ErrorCode::ItemNotFound(format!(
1237                    "subscription {} not found",
1238                    subscription_name
1239                )))
1240            })?;
1241        Ok(subscription.clone())
1242    }
1243
1244    pub fn get_subscription_by_name(
1245        &self,
1246        schema_name: Option<String>,
1247        subscription_name: &str,
1248    ) -> Result<Arc<SubscriptionCatalog>> {
1249        let db_name = &self.database();
1250        let search_path = self.config().search_path();
1251        let user_name = &self.user_name();
1252
1253        let catalog_reader = self.env().catalog_reader().read_guard();
1254        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1255        let (subscription, _) =
1256            catalog_reader.get_subscription_by_name(db_name, schema_path, subscription_name)?;
1257        Ok(subscription.clone())
1258    }
1259
1260    pub fn get_table_by_id(&self, table_id: TableId) -> Result<Arc<TableCatalog>> {
1261        let catalog_reader = self.env().catalog_reader().read_guard();
1262        Ok(catalog_reader.get_any_table_by_id(table_id)?.clone())
1263    }
1264
1265    pub fn get_table_by_name(
1266        &self,
1267        table_name: &str,
1268        db_id: DatabaseId,
1269        schema_id: SchemaId,
1270    ) -> Result<Arc<TableCatalog>> {
1271        let catalog_reader = self.env().catalog_reader().read_guard();
1272        let table = catalog_reader
1273            .get_schema_by_id(db_id, schema_id)?
1274            .get_created_table_by_name(table_name)
1275            .ok_or_else(|| {
1276                Error::new(
1277                    ErrorKind::InvalidInput,
1278                    format!("table \"{}\" does not exist", table_name),
1279                )
1280            })?;
1281
1282        self.check_privileges(&[ObjectCheckItem::new(
1283            table.owner(),
1284            AclMode::Select,
1285            table_name.to_owned(),
1286            table.id,
1287        )])?;
1288
1289        Ok(table.clone())
1290    }
1291
1292    pub fn get_secret_by_name(
1293        &self,
1294        schema_name: Option<String>,
1295        secret_name: &str,
1296    ) -> Result<Arc<SecretCatalog>> {
1297        let db_name = &self.database();
1298        let search_path = self.config().search_path();
1299        let user_name = &self.user_name();
1300
1301        let catalog_reader = self.env().catalog_reader().read_guard();
1302        let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
1303        let (secret, _) = catalog_reader.get_secret_by_name(db_name, schema_path, secret_name)?;
1304
1305        self.check_privileges(&[ObjectCheckItem::new(
1306            secret.owner(),
1307            AclMode::Usage,
1308            secret.name.clone(),
1309            secret.id,
1310        )])?;
1311
1312        Ok(secret.clone())
1313    }
1314
1315    pub fn list_change_log_epochs(
1316        &self,
1317        table_id: TableId,
1318        min_epoch: u64,
1319        max_count: u32,
1320    ) -> Result<Vec<u64>> {
1321        Ok(self
1322            .env
1323            .hummock_snapshot_manager()
1324            .acquire()
1325            .list_change_log_epochs(table_id, min_epoch, max_count))
1326    }
1327
1328    pub fn clear_cancel_query_flag(&self) {
1329        let mut flag = self.current_query_cancel_flag.lock();
1330        *flag = None;
1331    }
1332
1333    pub fn reset_cancel_query_flag(&self) -> ShutdownToken {
1334        let mut flag = self.current_query_cancel_flag.lock();
1335        let (shutdown_tx, shutdown_rx) = ShutdownToken::new();
1336        *flag = Some(shutdown_tx);
1337        shutdown_rx
1338    }
1339
1340    pub fn cancel_current_query(&self) {
1341        let mut flag_guard = self.current_query_cancel_flag.lock();
1342        if let Some(sender) = flag_guard.take() {
1343            info!("Trying to cancel query in local mode.");
1344            // Current running query is in local mode
1345            sender.cancel();
1346            info!("Cancel query request sent.");
1347        } else {
1348            info!("Trying to cancel query in distributed mode.");
1349            self.env.query_manager().cancel_queries_in_session(self.id)
1350        }
1351    }
1352
1353    pub fn cancel_current_creating_job(&self) {
1354        self.env.creating_streaming_job_tracker.abort_jobs(self.id);
1355    }
1356
1357    /// This function only used for test now.
1358    /// Maybe we can remove it in the future.
1359    pub async fn run_statement(
1360        self: Arc<Self>,
1361        sql: Arc<str>,
1362        formats: Vec<Format>,
1363    ) -> std::result::Result<PgResponse<PgResponseStream>, BoxedError> {
1364        // Parse sql.
1365        let mut stmts = Parser::parse_sql(&sql)?;
1366        if stmts.is_empty() {
1367            return Ok(PgResponse::empty_result(
1368                pgwire::pg_response::StatementType::EMPTY,
1369            ));
1370        }
1371        if stmts.len() > 1 {
1372            return Ok(
1373                PgResponse::builder(pgwire::pg_response::StatementType::EMPTY)
1374                    .notice("cannot insert multiple commands into statement")
1375                    .into(),
1376            );
1377        }
1378        let stmt = stmts.swap_remove(0);
1379        let rsp = handle(self, stmt, sql.clone(), formats).await?;
1380        Ok(rsp)
1381    }
1382
1383    pub fn notice_to_user(&self, str: impl Into<String>) {
1384        let notice = str.into();
1385        tracing::trace!(notice, "notice to user");
1386        self.notice_tx
1387            .send(notice)
1388            .expect("notice channel should not be closed");
1389    }
1390
1391    pub fn is_barrier_read(&self) -> bool {
1392        match self.config().visibility_mode() {
1393            VisibilityMode::Default => self.env.batch_config.enable_barrier_read,
1394            VisibilityMode::All => true,
1395            VisibilityMode::Checkpoint => false,
1396        }
1397    }
1398
1399    pub fn statement_timeout(&self) -> Duration {
1400        if self.config().statement_timeout() == 0 {
1401            Duration::from_secs(self.env.batch_config.statement_timeout_in_sec as u64)
1402        } else {
1403            Duration::from_secs(self.config().statement_timeout() as u64)
1404        }
1405    }
1406
1407    pub fn create_temporary_source(&self, source: SourceCatalog) {
1408        self.temporary_source_manager
1409            .lock()
1410            .create_source(source.name.clone(), source);
1411    }
1412
1413    pub fn get_temporary_source(&self, name: &str) -> Option<SourceCatalog> {
1414        self.temporary_source_manager
1415            .lock()
1416            .get_source(name)
1417            .cloned()
1418    }
1419
1420    pub fn drop_temporary_source(&self, name: &str) {
1421        self.temporary_source_manager.lock().drop_source(name);
1422    }
1423
1424    pub fn temporary_source_manager(&self) -> TemporarySourceManager {
1425        self.temporary_source_manager.lock().clone()
1426    }
1427
1428    pub fn create_staging_table(&self, table: TableCatalog) {
1429        self.staging_catalog_manager
1430            .lock()
1431            .create_table(table.name.clone(), table);
1432    }
1433
1434    pub fn drop_staging_table(&self, name: &str) {
1435        self.staging_catalog_manager.lock().drop_table(name);
1436    }
1437
1438    pub fn staging_catalog_manager(&self) -> StagingCatalogManager {
1439        self.staging_catalog_manager.lock().clone()
1440    }
1441
1442    pub async fn check_cluster_limits(&self) -> Result<()> {
1443        if self.config().bypass_cluster_limits() {
1444            return Ok(());
1445        }
1446
1447        let gen_message = |ActorCountPerParallelism {
1448                               worker_id_to_actor_count,
1449                               hard_limit,
1450                               soft_limit,
1451                           }: ActorCountPerParallelism,
1452                           exceed_hard_limit: bool|
1453         -> String {
1454            let (limit_type, action) = if exceed_hard_limit {
1455                ("critical", "Scale the cluster immediately to proceed.")
1456            } else {
1457                (
1458                    "recommended",
1459                    "Consider scaling the cluster for optimal performance.",
1460                )
1461            };
1462            format!(
1463                r#"Actor count per parallelism exceeds the {limit_type} limit.
1464
1465Depending on your workload, this may overload the cluster and cause performance/stability issues. {action}
1466
1467HINT:
1468- For best practices on managing streaming jobs: https://docs.risingwave.com/operate/manage-a-large-number-of-streaming-jobs
1469- To bypass the check (if the cluster load is acceptable): `[ALTER SYSTEM] SET bypass_cluster_limits TO true`.
1470  See https://docs.risingwave.com/operate/view-configure-runtime-parameters#how-to-configure-runtime-parameters
1471- Contact us via slack or https://risingwave.com/contact-us/ for further enquiry.
1472
1473DETAILS:
1474- hard limit: {hard_limit}
1475- soft limit: {soft_limit}
1476- worker_id_to_actor_count: {worker_id_to_actor_count:?}"#,
1477            )
1478        };
1479
1480        let limits = self.env().meta_client().get_cluster_limits().await?;
1481        for limit in limits {
1482            match limit {
1483                cluster_limit::ClusterLimit::ActorCount(l) => {
1484                    if l.exceed_hard_limit() {
1485                        return Err(RwError::from(ErrorCode::ProtocolError(gen_message(
1486                            l, true,
1487                        ))));
1488                    } else if l.exceed_soft_limit() {
1489                        self.notice_to_user(gen_message(l, false));
1490                    }
1491                }
1492            }
1493        }
1494        Ok(())
1495    }
1496}
1497
1498pub static SESSION_MANAGER: std::sync::OnceLock<Arc<SessionManagerImpl>> =
1499    std::sync::OnceLock::new();
1500
1501pub struct SessionManagerImpl {
1502    env: FrontendEnv,
1503    _join_handles: Vec<JoinHandle<()>>,
1504    _shutdown_senders: Vec<Sender<()>>,
1505    number: AtomicI32,
1506}
1507
1508impl SessionManager for SessionManagerImpl {
1509    type Error = RwError;
1510    type Session = SessionImpl;
1511
1512    fn create_dummy_session(&self, database_id: DatabaseId) -> Result<Arc<Self::Session>> {
1513        let dummy_addr = Address::Tcp(SocketAddr::new(
1514            IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
1515            5691, // port of meta
1516        ));
1517
1518        // Always use the built-in super user for dummy sessions.
1519        // This avoids permission checks tied to a specific user_id.
1520        self.connect_inner(
1521            database_id,
1522            risingwave_common::catalog::DEFAULT_SUPER_USER,
1523            Arc::new(dummy_addr),
1524        )
1525    }
1526
1527    fn connect(
1528        &self,
1529        database: &str,
1530        user_name: &str,
1531        peer_addr: AddressRef,
1532    ) -> Result<Arc<Self::Session>> {
1533        let catalog_reader = self.env.catalog_reader();
1534        let reader = catalog_reader.read_guard();
1535        let database_id = reader.get_database_by_name(database)?.id();
1536
1537        self.connect_inner(database_id, user_name, peer_addr)
1538    }
1539
1540    /// Used when cancel request happened.
1541    fn cancel_queries_in_session(&self, session_id: SessionId) {
1542        self.env.cancel_queries_in_session(session_id);
1543    }
1544
1545    fn cancel_creating_jobs_in_session(&self, session_id: SessionId) {
1546        self.env.cancel_creating_jobs_in_session(session_id);
1547    }
1548
1549    fn end_session(&self, session: &Self::Session) {
1550        self.delete_session(&session.session_id());
1551    }
1552
1553    async fn shutdown(&self) {
1554        // Clean up the session map.
1555        self.env.sessions_map().write().clear();
1556        // Unregister from the meta service.
1557        self.env.meta_client().try_unregister().await;
1558    }
1559}
1560
1561impl SessionManagerImpl {
1562    pub async fn new(opts: FrontendOpts) -> Result<Self> {
1563        // TODO(shutdown): only save join handles that **need** to be shutdown
1564        let (env, join_handles, shutdown_senders) = FrontendEnv::init(opts).await?;
1565        Ok(Self {
1566            env,
1567            _join_handles: join_handles,
1568            _shutdown_senders: shutdown_senders,
1569            number: AtomicI32::new(0),
1570        })
1571    }
1572
1573    pub(crate) fn env(&self) -> &FrontendEnv {
1574        &self.env
1575    }
1576
1577    fn insert_session(&self, session: Arc<SessionImpl>) {
1578        let active_sessions = {
1579            let mut write_guard = self.env.sessions_map.write();
1580            write_guard.insert(session.id(), session);
1581            write_guard.len()
1582        };
1583        self.env
1584            .frontend_metrics
1585            .active_sessions
1586            .set(active_sessions as i64);
1587    }
1588
1589    fn delete_session(&self, session_id: &SessionId) {
1590        let active_sessions = {
1591            let mut write_guard = self.env.sessions_map.write();
1592            write_guard.remove(session_id);
1593            write_guard.len()
1594        };
1595        self.env
1596            .frontend_metrics
1597            .active_sessions
1598            .set(active_sessions as i64);
1599    }
1600
1601    fn connect_inner(
1602        &self,
1603        database_id: DatabaseId,
1604        user_name: &str,
1605        peer_addr: AddressRef,
1606    ) -> Result<Arc<SessionImpl>> {
1607        let catalog_reader = self.env.catalog_reader();
1608        let reader = catalog_reader.read_guard();
1609        let (database_name, database_owner) = {
1610            let db = reader.get_database_by_id(database_id)?;
1611            (db.name(), db.owner())
1612        };
1613
1614        let user_reader = self.env.user_info_reader();
1615        let reader = user_reader.read_guard();
1616        if let Some(user) = reader.get_user_by_name(user_name) {
1617            if !user.can_login {
1618                bail_permission_denied!("User {} is not allowed to login", user_name);
1619            }
1620            let has_privilege = user.has_privilege(database_id, AclMode::Connect);
1621            if !user.is_super && database_owner != user.id && !has_privilege {
1622                bail_permission_denied!("User does not have CONNECT privilege.");
1623            }
1624
1625            // Check HBA configuration for LDAP authentication
1626            let (connection_type, client_addr) = match peer_addr.as_ref() {
1627                Address::Tcp(socket_addr) => (ConnectionType::Host, Some(&socket_addr.ip())),
1628                Address::Unix(_) => (ConnectionType::Local, None),
1629            };
1630            tracing::debug!(
1631                "receive connection: type={:?}, client_addr={:?}",
1632                connection_type,
1633                client_addr
1634            );
1635
1636            let hba_entry_opt = self.env.frontend_config().hba_config.find_matching_entry(
1637                &connection_type,
1638                database_name,
1639                user_name,
1640                client_addr,
1641            );
1642
1643            // TODO: adding `FATAL` message support for no matching HBA entry.
1644            let Some(hba_entry_opt) = hba_entry_opt else {
1645                bail_permission_denied!(
1646                    "no pg_hba.conf entry for host \"{peer_addr}\", user \"{user_name}\", database \"{database_name}\""
1647                );
1648            };
1649
1650            // Determine the user authenticator based on the user's auth info.
1651            let authenticator_by_info = || -> Result<UserAuthenticator> {
1652                let authenticator = match &user.auth_info {
1653                    None => UserAuthenticator::None,
1654                    Some(auth_info) => match auth_info.encryption_type() {
1655                        EncryptionType::Plaintext => {
1656                            UserAuthenticator::ClearText(auth_info.encrypted_value.clone())
1657                        }
1658                        EncryptionType::Md5 => {
1659                            let mut salt = [0; 4];
1660                            let mut rng = rand::rng();
1661                            rng.fill_bytes(&mut salt);
1662                            UserAuthenticator::Md5WithSalt {
1663                                encrypted_password: md5_hash_with_salt(
1664                                    &auth_info.encrypted_value,
1665                                    &salt,
1666                                ),
1667                                salt,
1668                            }
1669                        }
1670                        EncryptionType::Oauth => UserAuthenticator::OAuth {
1671                            metadata: auth_info.metadata.clone(),
1672                            cluster_id: self.env.meta_client().cluster_id().to_owned(),
1673                        },
1674                        _ => {
1675                            bail_protocol_error!(
1676                                "Unsupported auth type: {}",
1677                                auth_info.encryption_type().as_str_name()
1678                            );
1679                        }
1680                    },
1681                };
1682                Ok(authenticator)
1683            };
1684
1685            let user_authenticator = match (&hba_entry_opt.auth_method, &user.auth_info) {
1686                (AuthMethod::Trust, _) => UserAuthenticator::None,
1687                // For backward compatibility, we allow password auth method to work with any auth info.
1688                (AuthMethod::Password, _) => authenticator_by_info()?,
1689                (AuthMethod::Md5, Some(auth_info))
1690                    if auth_info.encryption_type() == EncryptionType::Md5 =>
1691                {
1692                    authenticator_by_info()?
1693                }
1694                (AuthMethod::OAuth, Some(auth_info))
1695                    if auth_info.encryption_type() == EncryptionType::Oauth =>
1696                {
1697                    authenticator_by_info()?
1698                }
1699                (AuthMethod::Ldap, _) => {
1700                    UserAuthenticator::Ldap(user_name.to_owned(), hba_entry_opt.clone())
1701                }
1702                _ => {
1703                    bail_permission_denied!(
1704                        "password authentication failed for user \"{user_name}\""
1705                    );
1706                }
1707            };
1708
1709            // Assign a session id and insert into sessions map (for cancel request).
1710            let secret_key = self.number.fetch_add(1, Ordering::Relaxed);
1711            // Use a trivial strategy: process_id and secret_key are equal.
1712            let id = (secret_key, secret_key);
1713            // Read session params snapshot from frontend env.
1714            let session_config = self.env.session_params_snapshot();
1715
1716            let session_impl: Arc<SessionImpl> = SessionImpl::new(
1717                self.env.clone(),
1718                AuthContext::new(database_name.to_owned(), user_name.to_owned(), user.id),
1719                user_authenticator,
1720                id,
1721                peer_addr,
1722                session_config,
1723            )
1724            .into();
1725            self.insert_session(session_impl.clone());
1726
1727            Ok(session_impl)
1728        } else {
1729            bail_catalog_error!("Role {} does not exist", user_name);
1730        }
1731    }
1732}
1733
1734impl Session for SessionImpl {
1735    type Error = RwError;
1736    type Portal = Portal;
1737    type PreparedStatement = PrepareStatement;
1738    type ValuesStream = PgResponseStream;
1739
1740    /// A copy of `run_statement` but exclude the parser part so each run must be at most one
1741    /// statement. The str sql use the `to_string` of AST. Consider Reuse later.
1742    async fn run_one_query(
1743        self: Arc<Self>,
1744        stmt: Statement,
1745        format: Format,
1746    ) -> Result<PgResponse<PgResponseStream>> {
1747        let string = stmt.to_string();
1748        let sql_str = string.as_str();
1749        let sql: Arc<str> = Arc::from(sql_str);
1750        // The handle can be slow. Release potential large String early.
1751        drop(string);
1752        let rsp = handle(self, stmt, sql, vec![format]).await?;
1753        Ok(rsp)
1754    }
1755
1756    fn user_authenticator(&self) -> &UserAuthenticator {
1757        &self.user_authenticator
1758    }
1759
1760    fn id(&self) -> SessionId {
1761        self.id
1762    }
1763
1764    async fn parse(
1765        self: Arc<Self>,
1766        statement: Option<Statement>,
1767        params_types: Vec<Option<DataType>>,
1768    ) -> Result<PrepareStatement> {
1769        Ok(if let Some(statement) = statement {
1770            handle_parse(self, statement, params_types).await?
1771        } else {
1772            PrepareStatement::Empty
1773        })
1774    }
1775
1776    fn bind(
1777        self: Arc<Self>,
1778        prepare_statement: PrepareStatement,
1779        params: Vec<Option<Bytes>>,
1780        param_formats: Vec<Format>,
1781        result_formats: Vec<Format>,
1782    ) -> Result<Portal> {
1783        handle_bind(prepare_statement, params, param_formats, result_formats)
1784    }
1785
1786    async fn execute(self: Arc<Self>, portal: Portal) -> Result<PgResponse<PgResponseStream>> {
1787        let rsp = handle_execute(self, portal).await?;
1788        Ok(rsp)
1789    }
1790
1791    fn describe_statement(
1792        self: Arc<Self>,
1793        prepare_statement: PrepareStatement,
1794    ) -> Result<(Vec<DataType>, Vec<PgFieldDescriptor>)> {
1795        Ok(match prepare_statement {
1796            PrepareStatement::Empty => (vec![], vec![]),
1797            PrepareStatement::Prepared(prepare_statement) => (
1798                prepare_statement.bound_result.param_types,
1799                infer(
1800                    Some(prepare_statement.bound_result.bound),
1801                    prepare_statement.statement,
1802                )?,
1803            ),
1804            PrepareStatement::PureStatement(statement) => (vec![], infer(None, statement)?),
1805        })
1806    }
1807
1808    fn describe_portal(self: Arc<Self>, portal: Portal) -> Result<Vec<PgFieldDescriptor>> {
1809        match portal {
1810            Portal::Empty => Ok(vec![]),
1811            Portal::Portal(portal) => {
1812                let mut columns = infer(Some(portal.bound_result.bound), portal.statement)?;
1813                let formats = FormatIterator::new(&portal.result_formats, columns.len())
1814                    .map_err(|e| RwError::from(ErrorCode::ProtocolError(e)))?;
1815                columns.iter_mut().zip_eq_fast(formats).for_each(|(c, f)| {
1816                    if f == Format::Binary {
1817                        c.set_to_binary()
1818                    }
1819                });
1820                Ok(columns)
1821            }
1822            Portal::PureStatement(statement) => Ok(infer(None, statement)?),
1823        }
1824    }
1825
1826    fn get_config(&self, key: &str) -> Result<String> {
1827        self.config().get(key).map_err(Into::into)
1828    }
1829
1830    fn set_config(&self, key: &str, value: String) -> Result<String> {
1831        Self::set_config(self, key, value)
1832    }
1833
1834    async fn next_notice(self: &Arc<Self>) -> String {
1835        std::future::poll_fn(|cx| self.clone().notice_rx.lock().poll_recv(cx))
1836            .await
1837            .expect("notice channel should not be closed")
1838    }
1839
1840    fn transaction_status(&self) -> TransactionStatus {
1841        match &*self.txn.lock() {
1842            transaction::State::Initial | transaction::State::Implicit(_) => {
1843                TransactionStatus::Idle
1844            }
1845            transaction::State::Explicit(_) => TransactionStatus::InTransaction,
1846            // TODO: failed transaction
1847        }
1848    }
1849
1850    /// Init and return an `ExecContextGuard` which could be used as a guard to represent the execution flow.
1851    fn init_exec_context(&self, sql: Arc<str>) -> ExecContextGuard {
1852        let exec_context = Arc::new(ExecContext {
1853            running_sql: sql,
1854            last_instant: Instant::now(),
1855            last_idle_instant: self.last_idle_instant.clone(),
1856        });
1857        *self.exec_context.lock() = Some(Arc::downgrade(&exec_context));
1858        // unset idle state, since there is a sql running
1859        *self.last_idle_instant.lock() = None;
1860        ExecContextGuard::new(exec_context)
1861    }
1862
1863    /// Check whether idle transaction timeout.
1864    /// If yes, unpin snapshot and return an `IdleInTxnTimeout` error.
1865    fn check_idle_in_transaction_timeout(&self) -> PsqlResult<()> {
1866        // In transaction.
1867        if matches!(self.transaction_status(), TransactionStatus::InTransaction) {
1868            let idle_in_transaction_session_timeout =
1869                self.config().idle_in_transaction_session_timeout() as u128;
1870            // Idle transaction timeout has been enabled.
1871            if idle_in_transaction_session_timeout != 0 {
1872                // Hold the `exec_context` lock to ensure no new sql coming when unpin_snapshot.
1873                let guard = self.exec_context.lock();
1874                // No running sql i.e. idle
1875                if guard.as_ref().and_then(|weak| weak.upgrade()).is_none() {
1876                    // Idle timeout.
1877                    if let Some(elapse_since_last_idle_instant) =
1878                        self.elapse_since_last_idle_instant()
1879                        && elapse_since_last_idle_instant > idle_in_transaction_session_timeout
1880                    {
1881                        return Err(PsqlError::IdleInTxnTimeout);
1882                    }
1883                }
1884            }
1885        }
1886        Ok(())
1887    }
1888}
1889
1890/// Returns row description of the statement
1891fn infer(bound: Option<BoundStatement>, stmt: Statement) -> Result<Vec<PgFieldDescriptor>> {
1892    match stmt {
1893        Statement::Query(_)
1894        | Statement::Insert { .. }
1895        | Statement::Delete { .. }
1896        | Statement::Update { .. }
1897        | Statement::FetchCursor { .. } => Ok(bound
1898            .unwrap()
1899            .output_fields()
1900            .iter()
1901            .map(to_pg_field)
1902            .collect()),
1903        Statement::ShowObjects {
1904            object: show_object,
1905            ..
1906        } => Ok(infer_show_object(&show_object)),
1907        Statement::ShowCreateObject { .. } => Ok(infer_show_create_object()),
1908        Statement::ShowTransactionIsolationLevel => {
1909            let name = "transaction_isolation";
1910            Ok(infer_show_variable(name))
1911        }
1912        Statement::ShowVariable { variable } => {
1913            let name = &variable[0].real_value().to_lowercase();
1914            Ok(infer_show_variable(name))
1915        }
1916        Statement::Describe { name: _, kind } => Ok(infer_describe(&kind)),
1917        Statement::Explain { .. } => Ok(vec![PgFieldDescriptor::new(
1918            "QUERY PLAN".to_owned(),
1919            DataType::Varchar.to_oid(),
1920            DataType::Varchar.type_len(),
1921        )]),
1922        _ => Ok(vec![]),
1923    }
1924}
1925
1926pub struct WorkerProcessId {
1927    pub worker_id: WorkerId,
1928    pub process_id: i32,
1929}
1930
1931impl WorkerProcessId {
1932    pub fn new(worker_id: WorkerId, process_id: i32) -> Self {
1933        Self {
1934            worker_id,
1935            process_id,
1936        }
1937    }
1938}
1939
1940impl Display for WorkerProcessId {
1941    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1942        write!(f, "{}:{}", self.worker_id, self.process_id)
1943    }
1944}
1945
1946impl TryFrom<String> for WorkerProcessId {
1947    type Error = String;
1948
1949    fn try_from(worker_process_id: String) -> std::result::Result<Self, Self::Error> {
1950        const INVALID: &str = "invalid WorkerProcessId";
1951        let splits: Vec<&str> = worker_process_id.split(":").collect();
1952        if splits.len() != 2 {
1953            return Err(INVALID.to_owned());
1954        }
1955        let Ok(worker_id) = splits[0].parse::<u32>() else {
1956            return Err(INVALID.to_owned());
1957        };
1958        let Ok(process_id) = splits[1].parse::<i32>() else {
1959            return Err(INVALID.to_owned());
1960        };
1961        Ok(WorkerProcessId::new(worker_id.into(), process_id))
1962    }
1963}
1964
1965pub fn cancel_queries_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1966    let guard = sessions_map.read();
1967    if let Some(session) = guard.get(&session_id) {
1968        session.cancel_current_query();
1969        true
1970    } else {
1971        info!("Current session finished, ignoring cancel query request");
1972        false
1973    }
1974}
1975
1976pub fn cancel_creating_jobs_in_session(session_id: SessionId, sessions_map: SessionMapRef) -> bool {
1977    let guard = sessions_map.read();
1978    if let Some(session) = guard.get(&session_id) {
1979        session.cancel_current_creating_job();
1980        true
1981    } else {
1982        info!("Current session finished, ignoring cancel creating request");
1983        false
1984    }
1985}