risingwave_batch/task/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use risingwave_common::config::{BatchConfig, MetricLevel};
18use risingwave_common::util::addr::HostAddr;
19use risingwave_common::util::worker_util::WorkerNodeId;
20use risingwave_connector::source::iceberg::IcebergScanMetrics;
21use risingwave_connector::source::monitor::SourceMetrics;
22use risingwave_dml::dml_manager::DmlManagerRef;
23use risingwave_rpc_client::ComputeClientPoolRef;
24use risingwave_storage::StateStoreImpl;
25
26use crate::monitor::{BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics};
27use crate::task::BatchManager;
28
29/// The global environment for task execution.
30/// The instance will be shared by every task.
31#[derive(Clone)]
32pub struct BatchEnvironment {
33    /// Endpoint the batch task manager listens on.
34    server_addr: HostAddr,
35
36    /// Reference to the task manager.
37    task_manager: Arc<BatchManager>,
38
39    /// Batch related configurations.
40    config: Arc<BatchConfig>,
41
42    /// Current worker node id.
43    worker_id: WorkerNodeId,
44
45    /// State store for table scanning.
46    state_store: StateStoreImpl,
47
48    /// Executor level metrics.
49    executor_metrics: Arc<BatchExecutorMetrics>,
50
51    /// Compute client pool for batch gRPC exchange.
52    client_pool: ComputeClientPoolRef,
53
54    /// Manages dml information.
55    dml_manager: DmlManagerRef,
56
57    /// Metrics for source.
58    source_metrics: Arc<SourceMetrics>,
59
60    /// Batch spill metrics
61    spill_metrics: Arc<BatchSpillMetrics>,
62
63    /// Metrics for iceberg scan.
64    iceberg_scan_metrics: Arc<IcebergScanMetrics>,
65
66    metric_level: MetricLevel,
67}
68
69impl BatchEnvironment {
70    #[allow(clippy::too_many_arguments)]
71    pub fn new(
72        task_manager: Arc<BatchManager>,
73        server_addr: HostAddr,
74        config: Arc<BatchConfig>,
75        worker_id: WorkerNodeId,
76        state_store: StateStoreImpl,
77        executor_metrics: Arc<BatchExecutorMetrics>,
78        client_pool: ComputeClientPoolRef,
79        dml_manager: DmlManagerRef,
80        source_metrics: Arc<SourceMetrics>,
81        spill_metrics: Arc<BatchSpillMetrics>,
82        iceberg_scan_metrics: Arc<IcebergScanMetrics>,
83        metric_level: MetricLevel,
84    ) -> Self {
85        BatchEnvironment {
86            server_addr,
87            task_manager,
88            config,
89            worker_id,
90            state_store,
91            executor_metrics,
92            client_pool,
93            dml_manager,
94            source_metrics,
95            spill_metrics,
96            iceberg_scan_metrics,
97            metric_level,
98        }
99    }
100
101    // Create an instance for testing purpose.
102    pub fn for_test() -> Self {
103        use risingwave_dml::dml_manager::DmlManager;
104        use risingwave_rpc_client::ComputeClientPool;
105        use risingwave_storage::monitor::MonitoredStorageMetrics;
106
107        BatchEnvironment {
108            task_manager: Arc::new(BatchManager::new(
109                BatchConfig::default(),
110                BatchManagerMetrics::for_test(),
111                u64::MAX,
112            )),
113            server_addr: "127.0.0.1:2333".parse().unwrap(),
114            config: Arc::new(BatchConfig::default()),
115            worker_id: WorkerNodeId::default(),
116            state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
117                MonitoredStorageMetrics::unused(),
118            )),
119            client_pool: Arc::new(ComputeClientPool::for_test()),
120            dml_manager: Arc::new(DmlManager::for_test()),
121            source_metrics: Arc::new(SourceMetrics::default()),
122            executor_metrics: BatchExecutorMetrics::for_test(),
123            spill_metrics: BatchSpillMetrics::for_test(),
124            iceberg_scan_metrics: IcebergScanMetrics::for_test(),
125            metric_level: MetricLevel::Debug,
126        }
127    }
128
129    pub fn server_address(&self) -> &HostAddr {
130        &self.server_addr
131    }
132
133    pub fn task_manager(&self) -> Arc<BatchManager> {
134        self.task_manager.clone()
135    }
136
137    pub fn config(&self) -> &BatchConfig {
138        self.config.as_ref()
139    }
140
141    pub fn worker_id(&self) -> WorkerNodeId {
142        self.worker_id
143    }
144
145    pub fn state_store(&self) -> StateStoreImpl {
146        self.state_store.clone()
147    }
148
149    pub fn manager_metrics(&self) -> Arc<BatchManagerMetrics> {
150        self.task_manager.metrics()
151    }
152
153    pub fn executor_metrics(&self) -> Arc<BatchExecutorMetrics> {
154        self.executor_metrics.clone()
155    }
156
157    pub fn client_pool(&self) -> ComputeClientPoolRef {
158        self.client_pool.clone()
159    }
160
161    pub fn dml_manager_ref(&self) -> DmlManagerRef {
162        self.dml_manager.clone()
163    }
164
165    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
166        self.source_metrics.clone()
167    }
168
169    pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
170        self.spill_metrics.clone()
171    }
172
173    pub fn metric_level(&self) -> MetricLevel {
174        self.metric_level
175    }
176
177    pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
178        self.iceberg_scan_metrics.clone()
179    }
180}