risingwave_stream/task/
env.rs1use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19pub(crate) use risingwave_common::id::WorkerId as WorkerNodeId;
20use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_connector::source::monitor::SourceMetrics;
23use risingwave_dml::dml_manager::DmlManagerRef;
24use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
25use risingwave_storage::StateStoreImpl;
26
27#[derive(Clone, Debug)]
30pub struct StreamEnvironment {
31 server_addr: HostAddr,
33
34 global_config: Arc<StreamingConfig>,
39
40 worker_id: WorkerNodeId,
42
43 state_store: StateStoreImpl,
45
46 dml_manager: DmlManagerRef,
48
49 system_params_manager: LocalSystemParamsManagerRef,
51
52 source_metrics: Arc<SourceMetrics>,
54
55 total_mem_val: Arc<TrAdder<i64>>,
57
58 meta_client: Option<MetaClient>,
60
61 client_pool: ComputeClientPoolRef,
63}
64
65impl StreamEnvironment {
66 #[allow(clippy::too_many_arguments)]
67 pub fn new(
68 server_addr: HostAddr,
69 global_config: Arc<StreamingConfig>,
70 worker_id: WorkerNodeId,
71 state_store: StateStoreImpl,
72 dml_manager: DmlManagerRef,
73 system_params_manager: LocalSystemParamsManagerRef,
74 source_metrics: Arc<SourceMetrics>,
75 meta_client: MetaClient,
76 client_pool: ComputeClientPoolRef,
77 ) -> Self {
78 StreamEnvironment {
79 server_addr,
80 global_config,
81 worker_id,
82 state_store,
83 dml_manager,
84 system_params_manager,
85 source_metrics,
86 total_mem_val: Arc::new(TrAdder::new()),
87 meta_client: Some(meta_client),
88 client_pool,
89 }
90 }
91
92 pub fn for_test() -> Self {
94 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
95 use risingwave_dml::dml_manager::DmlManager;
96 use risingwave_rpc_client::ComputeClientPool;
97 use risingwave_storage::monitor::MonitoredStorageMetrics;
98 StreamEnvironment {
99 server_addr: "127.0.0.1:2333".parse().unwrap(),
100 global_config: Arc::new(StreamingConfig::default()),
101 worker_id: WorkerNodeId::default(),
102 state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
103 MonitoredStorageMetrics::unused(),
104 )),
105 dml_manager: Arc::new(DmlManager::for_test()),
106 system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
107 source_metrics: Arc::new(SourceMetrics::default()),
108 total_mem_val: Arc::new(TrAdder::new()),
109 meta_client: None,
110 client_pool: Arc::new(ComputeClientPool::for_test()),
111 }
112 }
113
114 pub fn server_address(&self) -> &HostAddr {
115 &self.server_addr
116 }
117
118 pub fn global_config(&self) -> &Arc<StreamingConfig> {
119 &self.global_config
120 }
121
122 pub fn worker_id(&self) -> WorkerNodeId {
123 self.worker_id
124 }
125
126 pub fn state_store(&self) -> StateStoreImpl {
127 self.state_store.clone()
128 }
129
130 pub fn dml_manager_ref(&self) -> DmlManagerRef {
131 self.dml_manager.clone()
132 }
133
134 pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
135 self.system_params_manager.clone()
136 }
137
138 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
139 self.source_metrics.clone()
140 }
141
142 pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
143 self.total_mem_val.clone()
144 }
145
146 pub fn meta_client(&self) -> Option<MetaClient> {
147 self.meta_client.clone()
148 }
149
150 pub fn client_pool(&self) -> ComputeClientPoolRef {
151 self.client_pool.clone()
152 }
153}