risingwave_batch/task/
env.rsuse std::sync::Arc;
use risingwave_common::config::{BatchConfig, MetricLevel};
use risingwave_common::util::addr::HostAddr;
use risingwave_common::util::worker_util::WorkerNodeId;
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_dml::dml_manager::DmlManagerRef;
use risingwave_rpc_client::ComputeClientPoolRef;
use risingwave_storage::StateStoreImpl;
use crate::monitor::{
BatchExecutorMetrics, BatchManagerMetrics, BatchSpillMetrics, IcebergScanMetrics,
};
use crate::task::BatchManager;
#[derive(Clone)]
pub struct BatchEnvironment {
server_addr: HostAddr,
task_manager: Arc<BatchManager>,
config: Arc<BatchConfig>,
worker_id: WorkerNodeId,
state_store: StateStoreImpl,
executor_metrics: Arc<BatchExecutorMetrics>,
client_pool: ComputeClientPoolRef,
dml_manager: DmlManagerRef,
source_metrics: Arc<SourceMetrics>,
spill_metrics: Arc<BatchSpillMetrics>,
iceberg_scan_metrics: Arc<IcebergScanMetrics>,
metric_level: MetricLevel,
}
impl BatchEnvironment {
#[allow(clippy::too_many_arguments)]
pub fn new(
task_manager: Arc<BatchManager>,
server_addr: HostAddr,
config: Arc<BatchConfig>,
worker_id: WorkerNodeId,
state_store: StateStoreImpl,
executor_metrics: Arc<BatchExecutorMetrics>,
client_pool: ComputeClientPoolRef,
dml_manager: DmlManagerRef,
source_metrics: Arc<SourceMetrics>,
spill_metrics: Arc<BatchSpillMetrics>,
iceberg_scan_metrics: Arc<IcebergScanMetrics>,
metric_level: MetricLevel,
) -> Self {
BatchEnvironment {
server_addr,
task_manager,
config,
worker_id,
state_store,
executor_metrics,
client_pool,
dml_manager,
source_metrics,
spill_metrics,
iceberg_scan_metrics,
metric_level,
}
}
#[cfg(test)]
pub fn for_test() -> Self {
use risingwave_dml::dml_manager::DmlManager;
use risingwave_rpc_client::ComputeClientPool;
use risingwave_storage::monitor::MonitoredStorageMetrics;
BatchEnvironment {
task_manager: Arc::new(BatchManager::new(
BatchConfig::default(),
BatchManagerMetrics::for_test(),
u64::MAX,
)),
server_addr: "127.0.0.1:2333".parse().unwrap(),
config: Arc::new(BatchConfig::default()),
worker_id: WorkerNodeId::default(),
state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
MonitoredStorageMetrics::unused(),
)),
client_pool: Arc::new(ComputeClientPool::for_test()),
dml_manager: Arc::new(DmlManager::for_test()),
source_metrics: Arc::new(SourceMetrics::default()),
executor_metrics: BatchExecutorMetrics::for_test(),
spill_metrics: BatchSpillMetrics::for_test(),
iceberg_scan_metrics: IcebergScanMetrics::for_test(),
metric_level: MetricLevel::Debug,
}
}
pub fn server_address(&self) -> &HostAddr {
&self.server_addr
}
pub fn task_manager(&self) -> Arc<BatchManager> {
self.task_manager.clone()
}
pub fn config(&self) -> &BatchConfig {
self.config.as_ref()
}
pub fn worker_id(&self) -> WorkerNodeId {
self.worker_id
}
pub fn state_store(&self) -> StateStoreImpl {
self.state_store.clone()
}
pub fn manager_metrics(&self) -> Arc<BatchManagerMetrics> {
self.task_manager.metrics()
}
pub fn executor_metrics(&self) -> Arc<BatchExecutorMetrics> {
self.executor_metrics.clone()
}
pub fn client_pool(&self) -> ComputeClientPoolRef {
self.client_pool.clone()
}
pub fn dml_manager_ref(&self) -> DmlManagerRef {
self.dml_manager.clone()
}
pub fn source_metrics(&self) -> Arc<SourceMetrics> {
self.source_metrics.clone()
}
pub fn spill_metrics(&self) -> Arc<BatchSpillMetrics> {
self.spill_metrics.clone()
}
pub fn metric_level(&self) -> MetricLevel {
self.metric_level
}
pub fn iceberg_scan_metrics(&self) -> Arc<IcebergScanMetrics> {
self.iceberg_scan_metrics.clone()
}
}