risingwave_stream/task/
env.rs1use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::monitor::SourceMetrics;
22use risingwave_dml::dml_manager::DmlManagerRef;
23use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
24use risingwave_storage::StateStoreImpl;
25
26pub(crate) type WorkerNodeId = u32;
27
28#[derive(Clone, Debug)]
31pub struct StreamEnvironment {
32 server_addr: HostAddr,
34
35 config: Arc<StreamingConfig>,
37
38 worker_id: WorkerNodeId,
40
41 state_store: StateStoreImpl,
43
44 dml_manager: DmlManagerRef,
46
47 system_params_manager: LocalSystemParamsManagerRef,
49
50 source_metrics: Arc<SourceMetrics>,
52
53 total_mem_val: Arc<TrAdder<i64>>,
55
56 meta_client: Option<MetaClient>,
58
59 client_pool: ComputeClientPoolRef,
61}
62
63impl StreamEnvironment {
64 #[allow(clippy::too_many_arguments)]
65 pub fn new(
66 server_addr: HostAddr,
67 config: Arc<StreamingConfig>,
68 worker_id: WorkerNodeId,
69 state_store: StateStoreImpl,
70 dml_manager: DmlManagerRef,
71 system_params_manager: LocalSystemParamsManagerRef,
72 source_metrics: Arc<SourceMetrics>,
73 meta_client: MetaClient,
74 client_pool: ComputeClientPoolRef,
75 ) -> Self {
76 StreamEnvironment {
77 server_addr,
78 config,
79 worker_id,
80 state_store,
81 dml_manager,
82 system_params_manager,
83 source_metrics,
84 total_mem_val: Arc::new(TrAdder::new()),
85 meta_client: Some(meta_client),
86 client_pool,
87 }
88 }
89
90 #[cfg(test)]
92 pub fn for_test() -> Self {
93 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
94 use risingwave_dml::dml_manager::DmlManager;
95 use risingwave_rpc_client::ComputeClientPool;
96 use risingwave_storage::monitor::MonitoredStorageMetrics;
97 StreamEnvironment {
98 server_addr: "127.0.0.1:2333".parse().unwrap(),
99 config: Arc::new(StreamingConfig::default()),
100 worker_id: WorkerNodeId::default(),
101 state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
102 MonitoredStorageMetrics::unused(),
103 )),
104 dml_manager: Arc::new(DmlManager::for_test()),
105 system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
106 source_metrics: Arc::new(SourceMetrics::default()),
107 total_mem_val: Arc::new(TrAdder::new()),
108 meta_client: None,
109 client_pool: Arc::new(ComputeClientPool::for_test()),
110 }
111 }
112
113 pub fn server_address(&self) -> &HostAddr {
114 &self.server_addr
115 }
116
117 pub fn config(&self) -> &Arc<StreamingConfig> {
118 &self.config
119 }
120
121 pub fn worker_id(&self) -> WorkerNodeId {
122 self.worker_id
123 }
124
125 pub fn state_store(&self) -> StateStoreImpl {
126 self.state_store.clone()
127 }
128
129 pub fn dml_manager_ref(&self) -> DmlManagerRef {
130 self.dml_manager.clone()
131 }
132
133 pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
134 self.system_params_manager.clone()
135 }
136
137 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
138 self.source_metrics.clone()
139 }
140
141 pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
142 self.total_mem_val.clone()
143 }
144
145 pub fn meta_client(&self) -> Option<MetaClient> {
146 self.meta_client.clone()
147 }
148
149 pub fn client_pool(&self) -> ComputeClientPoolRef {
150 self.client_pool.clone()
151 }
152}