risingwave_stream/task/
env.rs1use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19pub(crate) use risingwave_common::id::WorkerId as WorkerNodeId;
20use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_connector::source::monitor::SourceMetrics;
23use risingwave_dml::dml_manager::DmlManagerRef;
24use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
25use risingwave_storage::StateStoreImpl;
26use tokio::sync::Semaphore;
27
28#[derive(Clone, Debug)]
31pub struct StreamEnvironment {
32 server_addr: HostAddr,
34
35 global_config: Arc<StreamingConfig>,
40
41 worker_id: WorkerNodeId,
43
44 state_store: StateStoreImpl,
46
47 dml_manager: DmlManagerRef,
49
50 system_params_manager: LocalSystemParamsManagerRef,
52
53 source_metrics: Arc<SourceMetrics>,
55
56 total_mem_val: Arc<TrAdder<i64>>,
58
59 meta_client: Option<MetaClient>,
61
62 client_pool: ComputeClientPoolRef,
64
65 kv_log_store_historical_read_semaphore: Option<Arc<Semaphore>>,
68}
69
70impl StreamEnvironment {
71 #[allow(clippy::too_many_arguments)]
72 pub fn new(
73 server_addr: HostAddr,
74 global_config: Arc<StreamingConfig>,
75 worker_id: WorkerNodeId,
76 state_store: StateStoreImpl,
77 dml_manager: DmlManagerRef,
78 system_params_manager: LocalSystemParamsManagerRef,
79 source_metrics: Arc<SourceMetrics>,
80 meta_client: MetaClient,
81 client_pool: ComputeClientPoolRef,
82 ) -> Self {
83 let kv_log_store_historical_read_semaphore = {
84 let max = global_config
85 .developer
86 .max_concurrent_kv_log_store_historical_read;
87 if max > 0 {
88 Some(Arc::new(Semaphore::new(max)))
89 } else {
90 None
91 }
92 };
93 StreamEnvironment {
94 server_addr,
95 global_config,
96 worker_id,
97 state_store,
98 dml_manager,
99 system_params_manager,
100 source_metrics,
101 total_mem_val: Arc::new(TrAdder::new()),
102 meta_client: Some(meta_client),
103 client_pool,
104 kv_log_store_historical_read_semaphore,
105 }
106 }
107
108 pub fn for_test() -> Self {
110 use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
111 use risingwave_dml::dml_manager::DmlManager;
112 use risingwave_rpc_client::ComputeClientPool;
113 use risingwave_storage::monitor::MonitoredStorageMetrics;
114 StreamEnvironment {
115 server_addr: "127.0.0.1:2333".parse().unwrap(),
116 global_config: Arc::new(StreamingConfig::default()),
117 worker_id: WorkerNodeId::default(),
118 state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
119 MonitoredStorageMetrics::unused(),
120 )),
121 dml_manager: Arc::new(DmlManager::for_test()),
122 system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
123 source_metrics: Arc::new(SourceMetrics::default()),
124 total_mem_val: Arc::new(TrAdder::new()),
125 meta_client: None,
126 client_pool: Arc::new(ComputeClientPool::for_test()),
127 kv_log_store_historical_read_semaphore: None,
128 }
129 }
130
131 pub fn server_address(&self) -> &HostAddr {
132 &self.server_addr
133 }
134
135 pub fn global_config(&self) -> &Arc<StreamingConfig> {
136 &self.global_config
137 }
138
139 pub fn worker_id(&self) -> WorkerNodeId {
140 self.worker_id
141 }
142
143 pub fn state_store(&self) -> StateStoreImpl {
144 self.state_store.clone()
145 }
146
147 pub fn dml_manager_ref(&self) -> DmlManagerRef {
148 self.dml_manager.clone()
149 }
150
151 pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
152 self.system_params_manager.clone()
153 }
154
155 pub fn source_metrics(&self) -> Arc<SourceMetrics> {
156 self.source_metrics.clone()
157 }
158
159 pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
160 self.total_mem_val.clone()
161 }
162
163 pub fn meta_client(&self) -> Option<MetaClient> {
164 self.meta_client.clone()
165 }
166
167 pub fn client_pool(&self) -> ComputeClientPoolRef {
168 self.client_pool.clone()
169 }
170
171 pub fn kv_log_store_historical_read_semaphore(&self) -> Option<Arc<Semaphore>> {
172 self.kv_log_store_historical_read_semaphore.clone()
173 }
174}