risingwave_frontend/scheduler/
task_context.rs1use std::sync::Arc;
16
17use prometheus::core::Atomic;
18use risingwave_batch::error::Result;
19use risingwave_batch::monitor::BatchMetrics;
20use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId};
21use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
22use risingwave_common::catalog::SysCatalogReaderRef;
23use risingwave_common::config::BatchConfig;
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::metrics::TrAdderAtomic;
26use risingwave_common::util::addr::{HostAddr, is_local_address};
27use risingwave_connector::source::monitor::SourceMetrics;
28use risingwave_rpc_client::ComputeClientPoolRef;
29
30use crate::catalog::system_catalog::SysCatalogReaderImpl;
31use crate::session::SessionImpl;
32
33#[derive(Clone)]
35pub struct FrontendBatchTaskContext {
36 session: Arc<SessionImpl>,
37
38 mem_context: MemoryContext,
39}
40
41impl FrontendBatchTaskContext {
42 pub fn create(session: Arc<SessionImpl>) -> Arc<dyn BatchTaskContext> {
43 let mem_context =
44 MemoryContext::new(Some(session.env().mem_context()), TrAdderAtomic::new(0));
45 Arc::new(Self {
46 session,
47 mem_context,
48 })
49 }
50}
51
52impl BatchTaskContext for FrontendBatchTaskContext {
53 fn get_task_output(&self, _task_output_id: TaskOutputId) -> Result<TaskOutput> {
54 unimplemented!("not supported in local mode")
55 }
56
57 fn catalog_reader(&self) -> SysCatalogReaderRef {
58 Arc::new(SysCatalogReaderImpl::new(
59 self.session.env().catalog_reader().clone(),
60 self.session.env().user_info_reader().clone(),
61 self.session.env().meta_client_ref(),
62 self.session.auth_context(),
63 self.session.shared_config(),
64 self.session.env().system_params_manager().get_params(),
65 ))
66 }
67
68 fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
69 is_local_address(self.session.env().server_address(), peer_addr)
70 }
71
72 fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
73 unimplemented!("not supported in local mode")
74 }
75
76 fn batch_metrics(&self) -> Option<BatchMetrics> {
77 None
78 }
79
80 fn client_pool(&self) -> ComputeClientPoolRef {
81 self.session.env().client_pool()
82 }
83
84 fn get_config(&self) -> &BatchConfig {
85 self.session.env().batch_config()
86 }
87
88 fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
89 unimplemented!("not supported in local mode")
90 }
91
92 fn source_metrics(&self) -> Arc<SourceMetrics> {
93 self.session.env().source_metrics()
94 }
95
96 fn spill_metrics(&self) -> Arc<risingwave_batch::monitor::BatchSpillMetrics> {
97 self.session.env().spill_metrics()
98 }
99
100 fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
101 MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0))
102 }
103
104 fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
105 Some(self.session.env().worker_node_manager_ref())
106 }
107}