risingwave_frontend/scheduler/
task_context.rs1use std::sync::Arc;
16
17use prometheus::core::Atomic;
18use risingwave_batch::error::Result;
19use risingwave_batch::monitor::BatchMetrics;
20use risingwave_batch::task::{BatchTaskContext, TaskOutput, TaskOutputId};
21use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef;
22use risingwave_common::catalog::SysCatalogReaderRef;
23use risingwave_common::config::BatchConfig;
24use risingwave_common::memory::MemoryContext;
25use risingwave_common::metrics::TrAdderAtomic;
26use risingwave_common::metrics_reader::MetricsReader;
27use risingwave_common::util::addr::{HostAddr, is_local_address};
28use risingwave_connector::source::monitor::SourceMetrics;
29use risingwave_rpc_client::ComputeClientPoolRef;
30
31use crate::catalog::system_catalog::SysCatalogReaderImpl;
32use crate::metrics_reader::MetricsReaderImpl;
33use crate::session::SessionImpl;
34
35#[derive(Clone)]
37pub struct FrontendBatchTaskContext {
38 session: Arc<SessionImpl>,
39
40 mem_context: MemoryContext,
41}
42
43impl FrontendBatchTaskContext {
44 pub fn create(session: Arc<SessionImpl>) -> Arc<dyn BatchTaskContext> {
45 let mem_context =
46 MemoryContext::new(Some(session.env().mem_context()), TrAdderAtomic::new(0));
47 Arc::new(Self {
48 session,
49 mem_context,
50 })
51 }
52}
53
54impl BatchTaskContext for FrontendBatchTaskContext {
55 fn get_task_output(&self, _task_output_id: TaskOutputId) -> Result<TaskOutput> {
56 unimplemented!("not supported in local mode")
57 }
58
59 fn catalog_reader(&self) -> SysCatalogReaderRef {
60 Arc::new(SysCatalogReaderImpl::new(
61 self.session.env().catalog_reader().clone(),
62 self.session.env().user_info_reader().clone(),
63 self.session.env().meta_client_ref(),
64 self.session.auth_context(),
65 self.session.shared_config(),
66 self.session.env().system_params_manager().get_params(),
67 ))
68 }
69
70 fn is_local_addr(&self, peer_addr: &HostAddr) -> bool {
71 is_local_address(self.session.env().server_address(), peer_addr)
72 }
73
74 fn state_store(&self) -> risingwave_storage::store_impl::StateStoreImpl {
75 unimplemented!("not supported in local mode")
76 }
77
78 fn batch_metrics(&self) -> Option<BatchMetrics> {
79 None
80 }
81
82 fn client_pool(&self) -> ComputeClientPoolRef {
83 self.session.env().client_pool()
84 }
85
86 fn get_config(&self) -> &BatchConfig {
87 self.session.env().batch_config()
88 }
89
90 fn dml_manager(&self) -> risingwave_dml::dml_manager::DmlManagerRef {
91 unimplemented!("not supported in local mode")
92 }
93
94 fn source_metrics(&self) -> Arc<SourceMetrics> {
95 self.session.env().source_metrics()
96 }
97
98 fn spill_metrics(&self) -> Arc<risingwave_batch::monitor::BatchSpillMetrics> {
99 self.session.env().spill_metrics()
100 }
101
102 fn create_executor_mem_context(&self, _executor_id: &str) -> MemoryContext {
103 MemoryContext::new(Some(self.mem_context.clone()), TrAdderAtomic::new(0))
104 }
105
106 fn worker_node_manager(&self) -> Option<WorkerNodeManagerRef> {
107 Some(self.session.env().worker_node_manager_ref())
108 }
109
110 fn metrics_reader(&self) -> Arc<dyn MetricsReader> {
111 Arc::new(MetricsReaderImpl::new(
112 self.session.env().prometheus_client().cloned(),
113 self.session.env().prometheus_selector().to_owned(),
114 ))
115 }
116}