risingwave_stream/task/
env.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19pub(crate) use risingwave_common::id::WorkerId as WorkerNodeId;
20use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_connector::source::monitor::SourceMetrics;
23use risingwave_dml::dml_manager::DmlManagerRef;
24use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
25use risingwave_storage::StateStoreImpl;
26use tokio::sync::Semaphore;
27
28/// The global environment for task execution.
29/// The instance will be shared by every task.
30#[derive(Clone, Debug)]
31pub struct StreamEnvironment {
32    /// Endpoint the stream manager listens on.
33    server_addr: HostAddr,
34
35    /// Streaming related configurations.
36    ///
37    /// This is the global config for the whole compute node. Actor may have its config overridden.
38    /// In executor, use `actor_context.config` instead.
39    global_config: Arc<StreamingConfig>,
40
41    /// Current worker node id.
42    worker_id: WorkerNodeId,
43
44    /// State store for table scanning.
45    state_store: StateStoreImpl,
46
47    /// Manages dml information.
48    dml_manager: DmlManagerRef,
49
50    /// Read the latest system parameters.
51    system_params_manager: LocalSystemParamsManagerRef,
52
53    /// Metrics for source.
54    source_metrics: Arc<SourceMetrics>,
55
56    /// Total memory usage in stream.
57    total_mem_val: Arc<TrAdder<i64>>,
58
59    /// Meta client. Use `None` for test only
60    meta_client: Option<MetaClient>,
61
62    /// Compute client pool for streaming gRPC exchange.
63    client_pool: ComputeClientPoolRef,
64
65    /// Semaphore to limit the number of kv log store readers concurrently reading historical data.
66    /// `None` means unlimited.
67    kv_log_store_historical_read_semaphore: Option<Arc<Semaphore>>,
68}
69
70impl StreamEnvironment {
71    #[allow(clippy::too_many_arguments)]
72    pub fn new(
73        server_addr: HostAddr,
74        global_config: Arc<StreamingConfig>,
75        worker_id: WorkerNodeId,
76        state_store: StateStoreImpl,
77        dml_manager: DmlManagerRef,
78        system_params_manager: LocalSystemParamsManagerRef,
79        source_metrics: Arc<SourceMetrics>,
80        meta_client: MetaClient,
81        client_pool: ComputeClientPoolRef,
82    ) -> Self {
83        let kv_log_store_historical_read_semaphore = {
84            let max = global_config
85                .developer
86                .max_concurrent_kv_log_store_historical_read;
87            if max > 0 {
88                Some(Arc::new(Semaphore::new(max)))
89            } else {
90                None
91            }
92        };
93        StreamEnvironment {
94            server_addr,
95            global_config,
96            worker_id,
97            state_store,
98            dml_manager,
99            system_params_manager,
100            source_metrics,
101            total_mem_val: Arc::new(TrAdder::new()),
102            meta_client: Some(meta_client),
103            client_pool,
104            kv_log_store_historical_read_semaphore,
105        }
106    }
107
108    // Create an instance for testing purpose.
109    pub fn for_test() -> Self {
110        use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
111        use risingwave_dml::dml_manager::DmlManager;
112        use risingwave_rpc_client::ComputeClientPool;
113        use risingwave_storage::monitor::MonitoredStorageMetrics;
114        StreamEnvironment {
115            server_addr: "127.0.0.1:2333".parse().unwrap(),
116            global_config: Arc::new(StreamingConfig::default()),
117            worker_id: WorkerNodeId::default(),
118            state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
119                MonitoredStorageMetrics::unused(),
120            )),
121            dml_manager: Arc::new(DmlManager::for_test()),
122            system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
123            source_metrics: Arc::new(SourceMetrics::default()),
124            total_mem_val: Arc::new(TrAdder::new()),
125            meta_client: None,
126            client_pool: Arc::new(ComputeClientPool::for_test()),
127            kv_log_store_historical_read_semaphore: None,
128        }
129    }
130
131    pub fn server_address(&self) -> &HostAddr {
132        &self.server_addr
133    }
134
135    pub fn global_config(&self) -> &Arc<StreamingConfig> {
136        &self.global_config
137    }
138
139    pub fn worker_id(&self) -> WorkerNodeId {
140        self.worker_id
141    }
142
143    pub fn state_store(&self) -> StateStoreImpl {
144        self.state_store.clone()
145    }
146
147    pub fn dml_manager_ref(&self) -> DmlManagerRef {
148        self.dml_manager.clone()
149    }
150
151    pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
152        self.system_params_manager.clone()
153    }
154
155    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
156        self.source_metrics.clone()
157    }
158
159    pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
160        self.total_mem_val.clone()
161    }
162
163    pub fn meta_client(&self) -> Option<MetaClient> {
164        self.meta_client.clone()
165    }
166
167    pub fn client_pool(&self) -> ComputeClientPoolRef {
168        self.client_pool.clone()
169    }
170
171    pub fn kv_log_store_historical_read_semaphore(&self) -> Option<Arc<Semaphore>> {
172        self.kv_log_store_historical_read_semaphore.clone()
173    }
174}