risingwave_stream/task/
env.rs1use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::monitor::SourceMetrics;
22use risingwave_dml::dml_manager::DmlManagerRef;
23use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
24use risingwave_storage::StateStoreImpl;
25
26pub(crate) type WorkerNodeId = u32;
27
28#[derive(Clone, Debug)]
31pub struct StreamEnvironment {
32 server_addr: HostAddr,
34
35 config: Arc<StreamingConfig>,
37
38 worker_id: WorkerNodeId,
40
41 state_store: StateStoreImpl,
43
44 dml_manager: DmlManagerRef,
46
47 system_params_manager: LocalSystemParamsManagerRef,
49
50 source_metrics: Arc<SourceMetrics>,
52
53 total_mem_val: Arc<TrAdder<i64>>,
55
56 meta_client: Option<MetaClient>,
58
59 client_pool: ComputeClientPoolRef,
61}
62
63impl StreamEnvironment {
64 #[allow(clippy::too_many_arguments)]
65 pub fn new(
66 server_addr: HostAddr,
67 config: Arc<StreamingConfig>,
68 worker_id: WorkerNodeId,
69 state_store: StateStoreImpl,
70 dml_manager: DmlManagerRef,
71 system_params_manager: LocalSystemParamsManagerRef,
72 source_metrics: Arc<SourceMetrics>,
73 meta_client: MetaClient,
74 client_pool: ComputeClientPoolRef,
75 ) -> Self {
76 StreamEnvironment {
77 server_addr,
78 config,
79 worker_id,
80 state_store,
81 dml_manager,
82 system_params_manager,
83 source_metrics,
84 total_mem_val: Arc::new(TrAdder::new()),
85 meta_client: Some(meta_client),
86 client_pool,
87 }
88 }
89
90 pub fn for_test() -> Self {
92 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
93 use risingwave_dml::dml_manager::DmlManager;
94 use risingwave_rpc_client::ComputeClientPool;
95 use risingwave_storage::monitor::MonitoredStorageMetrics;
96 StreamEnvironment {
97 server_addr: "127.0.0.1:2333".parse().unwrap(),
98 config: Arc::new(StreamingConfig::default()),
99 worker_id: WorkerNodeId::default(),
100 state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
101 MonitoredStorageMetrics::unused(),
102 )),
103 dml_manager: Arc::new(DmlManager::for_test()),
104 system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
105 source_metrics: Arc::new(SourceMetrics::default()),
106 total_mem_val: Arc::new(TrAdder::new()),
107 meta_client: None,
108 client_pool: Arc::new(ComputeClientPool::for_test()),
109 }
110 }
111
112 pub fn server_address(&self) -> &HostAddr {
113 &self.server_addr
114 }
115
116 pub fn config(&self) -> &Arc<StreamingConfig> {
117 &self.config
118 }
119
120 pub fn worker_id(&self) -> WorkerNodeId {
121 self.worker_id
122 }
123
124 pub fn state_store(&self) -> StateStoreImpl {
125 self.state_store.clone()
126 }
127
128 pub fn dml_manager_ref(&self) -> DmlManagerRef {
129 self.dml_manager.clone()
130 }
131
132 pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
133 self.system_params_manager.clone()
134 }
135
136 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
137 self.source_metrics.clone()
138 }
139
140 pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
141 self.total_mem_val.clone()
142 }
143
144 pub fn meta_client(&self) -> Option<MetaClient> {
145 self.meta_client.clone()
146 }
147
148 pub fn client_pool(&self) -> ComputeClientPoolRef {
149 self.client_pool.clone()
150 }
151}