risingwave_batch/task/
env.rs1use std::sync::Arc;
16
17use risingwave_common::config::{BatchConfig, MetricLevel};
18use risingwave_common::util::addr::HostAddr;
19use risingwave_common::util::worker_util::WorkerNodeId;
20use risingwave_connector::source::iceberg::IcebergScanMetrics;
21use risingwave_connector::source::monitor::SourceMetrics;
22use risingwave_dml::dml_manager::DmlManagerRef;
23use risingwave_rpc_client::ComputeClientPoolRef;
24use risingwave_storage::StateStoreImpl;
25
26use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics};
27use crate::task::BatchManager;
28
29#[derive(Clone)]
32pub struct BatchEnvironment {
33 server_addr: HostAddr,
35
36 task_manager: Arc<BatchManager>,
38
39 config: Arc<BatchConfig>,
41
42 worker_id: WorkerNodeId,
44
45 state_store: StateStoreImpl,
47
48 executor_metrics: Arc<BatchExecutorMetrics>,
50
51 client_pool: ComputeClientPoolRef,
53
54 dml_manager: DmlManagerRef,
56
57 source_metrics: Arc<SourceMetrics>,
59
60 spill_metrics: Arc<BatchSpillMetrics>,
62
63 iceberg_scan_metrics: Arc<IcebergScanMetrics>,
65
66 metric_level: MetricLevel,
67}
68
69impl BatchEnvironment {
70 #[allow(clippy::too_many_arguments)]
71 pub fn new(
72 task_manager: Arc<BatchManager>,
73 server_addr: HostAddr,
74 config: Arc<BatchConfig>,
75 worker_id: WorkerNodeId,
76 state_store: StateStoreImpl,
77 executor_metrics: Arc<BatchExecutorMetrics>,
78 client_pool: ComputeClientPoolRef,
79 dml_manager: DmlManagerRef,
80 source_metrics: Arc<SourceMetrics>,
81 spill_metrics: Arc<BatchSpillMetrics>,
82 iceberg_scan_metrics: Arc<IcebergScanMetrics>,
83 metric_level: MetricLevel,
84 ) -> Self {
85 BatchEnvironment {
86 server_addr,
87 task_manager,
88 config,
89 worker_id,
90 state_store,
91 executor_metrics,
92 client_pool,
93 dml_manager,
94 source_metrics,
95 spill_metrics,
96 iceberg_scan_metrics,
97 metric_level,
98 }
99 }
100
101 pub fn for_test() -> Self {
103 use risingwave_dml::dml_manager::DmlManager;
104 use risingwave_rpc_client::ComputeClientPool;
105 use risingwave_storage::monitor::MonitoredStorageMetrics;
106
107 BatchEnvironment {
108 task_manager: Arc::new(BatchManager::new(
109 BatchConfig::default(),
110 BatchManagerMetrics::for_test(),
111 u64::MAX,
112 )),
113 server_addr: "127.0.0.1:2333".parse().unwrap(),
114 config: Arc::new(BatchConfig::default()),
115 worker_id: WorkerNodeId::default(),
116 state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
117 MonitoredStorageMetrics::unused(),
118 )),
119 client_pool: Arc::new(ComputeClientPool::for_test()),
120 dml_manager: Arc::new(DmlManager::for_test()),
121 source_metrics: Arc::new(SourceMetrics::default()),
122 executor_metrics: BatchExecutorMetrics::for_test(),
123 spill_metrics: BatchSpillMetrics::for_test(),
124 iceberg_scan_metrics: IcebergScanMetrics::for_test(),
125 metric_level: MetricLevel::Debug,
126 }
127 }
128
129 pub fn server_address(&self) -> &HostAddr {
130 &self.server_addr
131 }
132
133 pub fn task_manager(&self) -> Arc<BatchManager> {
134 self.task_manager.clone()
135 }
136
137 pub fn config(&self) -> &BatchConfig {
138 self.config.as_ref()
139 }
140
141 pub fn worker_id(&self) -> WorkerNodeId {
142 self.worker_id
143 }
144
145 pub fn state_store(&self) -> StateStoreImpl {
146 self.state_store.clone()
147 }
148
149 pub fn manager_metrics(&self) -> Arc<BatchManagerMetrics> {
150 self.task_manager.metrics()
151 }
152
153 pub fn executor_metrics(&self) -> Arc<BatchExecutorMetrics> {
154 self.executor_metrics.clone()
155 }
156
157 pub fn client_pool(&self) -> ComputeClientPoolRef {
158 self.client_pool.clone()
159 }
160
161 pub fn dml_manager_ref(&self) -> DmlManagerRef {
162 self.dml_manager.clone()
163 }
164
165 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
166 self.source_metrics.clone()
167 }
168
169 pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
170 self.spill_metrics.clone()
171 }
172
173 pub fn metric_level(&self) -> MetricLevel {
174 self.metric_level
175 }
176
177 pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
178 self.iceberg_scan_metrics.clone()
179 }
180}