risingwave_frontend/scheduler/
task_context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use prometheus::core::Atomic;
18use risingwave_batch::error::Result;
19use risingwave_batch::monitor::BatchMetrics;
20use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId};
21use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
22use risingwave_common::catalog::SysCatalogReaderRef;
23use risingwave_common::config::BatchConfig;
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::metrics::TrAdderAtomic;
26use risingwave_common::metrics_reader::MetricsReader;
27use risingwave_common::util::addr::{HostAddr, is_local_address};
28use risingwave_connector::source::monitor::SourceMetrics;
29use risingwave_rpc_client::ComputeClientPoolRef;
30
31use crate::catalog::system_catalog::SysCatalogReaderImpl;
32use crate::metrics_reader::MetricsReaderImpl;
33use crate::session::SessionImpl;
34
35/// Batch task execution context in frontend.
36#[derive(Clone)]
37pub struct FrontendBatchTaskContext {
38    session: Arc<SessionImpl>,
39
40    mem_context: MemoryContext,
41}
42
43impl FrontendBatchTaskContext {
44    pub fn create(session: Arc<SessionImpl>) -> Arc<dyn BatchTaskContext> {
45        let mem_context =
46            MemoryContext::new(Some(session.env().mem_context()), TrAdderAtomic::new(0));
47        Arc::new(Self {
48            session,
49            mem_context,
50        })
51    }
52}
53
54impl BatchTaskContext for FrontendBatchTaskContext {
55    fn get_task_output(&self, _task_output_id: TaskOutputId) -> Result<TaskOutput> {
56        unimplemented!("not supported in local mode")
57    }
58
59    fn catalog_reader(&self) -> SysCatalogReaderRef {
60        Arc::new(SysCatalogReaderImpl::new(
61            self.session.env().catalog_reader().clone(),
62            self.session.env().user_info_reader().clone(),
63            self.session.env().meta_client_ref(),
64            self.session.auth_context(),
65            self.session.shared_config(),
66            self.session.env().system_params_manager().get_params(),
67        ))
68    }
69
70    fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
71        is_local_address(self.session.env().server_address(), peer_addr)
72    }
73
74    fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
75        unimplemented!("not supported in local mode")
76    }
77
78    fn batch_metrics(&self) -> Option<BatchMetrics> {
79        None
80    }
81
82    fn client_pool(&self) -> ComputeClientPoolRef {
83        self.session.env().client_pool()
84    }
85
86    fn get_config(&self) -> &BatchConfig {
87        self.session.env().batch_config()
88    }
89
90    fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
91        unimplemented!("not supported in local mode")
92    }
93
94    fn source_metrics(&self) -> Arc<SourceMetrics> {
95        self.session.env().source_metrics()
96    }
97
98    fn spill_metrics(&self) -> Arc<risingwave_batch::monitor::BatchSpillMetrics> {
99        self.session.env().spill_metrics()
100    }
101
102    fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
103        MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0))
104    }
105
106    fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
107        Some(self.session.env().worker_node_manager_ref())
108    }
109
110    fn metrics_reader(&self) -> Arc<dyn MetricsReader> {
111        Arc::new(MetricsReaderImpl::new(
112            self.session.env().prometheus_client().cloned(),
113            self.session.env().prometheus_selector().to_owned(),
114        ))
115    }
116}