risingwave_frontend/scheduler/
task_context.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use prometheus::core::Atomic;
18use risingwave_batch::error::Result;
19use risingwave_batch::monitor::BatchMetrics;
20use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId};
21use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
22use risingwave_common::catalog::SysCatalogReaderRef;
23use risingwave_common::config::BatchConfig;
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::metrics::TrAdderAtomic;
26use risingwave_common::util::addr::{HostAddr, is_local_address};
27use risingwave_connector::source::monitor::SourceMetrics;
28use risingwave_rpc_client::ComputeClientPoolRef;
29
30use crate::catalog::system_catalog::SysCatalogReaderImpl;
31use crate::session::SessionImpl;
32
33/// Batch task execution context in frontend.
34#[derive(Clone)]
35pub struct FrontendBatchTaskContext {
36    session: Arc<SessionImpl>,
37
38    mem_context: MemoryContext,
39}
40
41impl FrontendBatchTaskContext {
42    pub fn create(session: Arc<SessionImpl>) -> Arc<dyn BatchTaskContext> {
43        let mem_context =
44            MemoryContext::new(Some(session.env().mem_context()), TrAdderAtomic::new(0));
45        Arc::new(Self {
46            session,
47            mem_context,
48        })
49    }
50}
51
52impl BatchTaskContext for FrontendBatchTaskContext {
53    fn get_task_output(&self, _task_output_id: TaskOutputId) -> Result<TaskOutput> {
54        unimplemented!("not supported in local mode")
55    }
56
57    fn catalog_reader(&self) -> SysCatalogReaderRef {
58        Arc::new(SysCatalogReaderImpl::new(
59            self.session.env().catalog_reader().clone(),
60            self.session.env().user_info_reader().clone(),
61            self.session.env().meta_client_ref(),
62            self.session.auth_context(),
63            self.session.shared_config(),
64            self.session.env().system_params_manager().get_params(),
65        ))
66    }
67
68    fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
69        is_local_address(self.session.env().server_address(), peer_addr)
70    }
71
72    fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
73        unimplemented!("not supported in local mode")
74    }
75
76    fn batch_metrics(&self) -> Option<BatchMetrics> {
77        None
78    }
79
80    fn client_pool(&self) -> ComputeClientPoolRef {
81        self.session.env().client_pool()
82    }
83
84    fn get_config(&self) -> &BatchConfig {
85        self.session.env().batch_config()
86    }
87
88    fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
89        unimplemented!("not supported in local mode")
90    }
91
92    fn source_metrics(&self) -> Arc<SourceMetrics> {
93        self.session.env().source_metrics()
94    }
95
96    fn spill_metrics(&self) -> Arc<risingwave_batch::monitor::BatchSpillMetrics> {
97        self.session.env().spill_metrics()
98    }
99
100    fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
101        MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0))
102    }
103
104    fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
105        Some(self.session.env().worker_node_manager_ref())
106    }
107}