risingwave_stream/task/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_connector::source::monitor::SourceMetrics;
22use risingwave_dml::dml_manager::DmlManagerRef;
23use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
24use risingwave_storage::StateStoreImpl;
25
26pub(crate) type WorkerNodeId = u32;
27
28/// The global environment for task execution.
29/// The instance will be shared by every task.
30#[derive(Clone, Debug)]
31pub struct StreamEnvironment {
32    /// Endpoint the stream manager listens on.
33    server_addr: HostAddr,
34
35    /// Streaming related configurations.
36    config: Arc<StreamingConfig>,
37
38    /// Current worker node id.
39    worker_id: WorkerNodeId,
40
41    /// State store for table scanning.
42    state_store: StateStoreImpl,
43
44    /// Manages dml information.
45    dml_manager: DmlManagerRef,
46
47    /// Read the latest system parameters.
48    system_params_manager: LocalSystemParamsManagerRef,
49
50    /// Metrics for source.
51    source_metrics: Arc<SourceMetrics>,
52
53    /// Total memory usage in stream.
54    total_mem_val: Arc<TrAdder<i64>>,
55
56    /// Meta client. Use `None` for test only
57    meta_client: Option<MetaClient>,
58
59    /// Compute client pool for streaming gRPC exchange.
60    client_pool: ComputeClientPoolRef,
61}
62
63impl StreamEnvironment {
64    #[allow(clippy::too_many_arguments)]
65    pub fn new(
66        server_addr: HostAddr,
67        config: Arc<StreamingConfig>,
68        worker_id: WorkerNodeId,
69        state_store: StateStoreImpl,
70        dml_manager: DmlManagerRef,
71        system_params_manager: LocalSystemParamsManagerRef,
72        source_metrics: Arc<SourceMetrics>,
73        meta_client: MetaClient,
74        client_pool: ComputeClientPoolRef,
75    ) -> Self {
76        StreamEnvironment {
77            server_addr,
78            config,
79            worker_id,
80            state_store,
81            dml_manager,
82            system_params_manager,
83            source_metrics,
84            total_mem_val: Arc::new(TrAdder::new()),
85            meta_client: Some(meta_client),
86            client_pool,
87        }
88    }
89
90    // Create an instance for testing purpose.
91    #[cfg(test)]
92    pub fn for_test() -> Self {
93        use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
94        use risingwave_dml::dml_manager::DmlManager;
95        use risingwave_rpc_client::ComputeClientPool;
96        use risingwave_storage::monitor::MonitoredStorageMetrics;
97        StreamEnvironment {
98            server_addr: "127.0.0.1:2333".parse().unwrap(),
99            config: Arc::new(StreamingConfig::default()),
100            worker_id: WorkerNodeId::default(),
101            state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
102                MonitoredStorageMetrics::unused(),
103            )),
104            dml_manager: Arc::new(DmlManager::for_test()),
105            system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
106            source_metrics: Arc::new(SourceMetrics::default()),
107            total_mem_val: Arc::new(TrAdder::new()),
108            meta_client: None,
109            client_pool: Arc::new(ComputeClientPool::for_test()),
110        }
111    }
112
113    pub fn server_address(&self) -> &HostAddr {
114        &self.server_addr
115    }
116
117    pub fn config(&self) -> &Arc<StreamingConfig> {
118        &self.config
119    }
120
121    pub fn worker_id(&self) -> WorkerNodeId {
122        self.worker_id
123    }
124
125    pub fn state_store(&self) -> StateStoreImpl {
126        self.state_store.clone()
127    }
128
129    pub fn dml_manager_ref(&self) -> DmlManagerRef {
130        self.dml_manager.clone()
131    }
132
133    pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
134        self.system_params_manager.clone()
135    }
136
137    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
138        self.source_metrics.clone()
139    }
140
141    pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
142        self.total_mem_val.clone()
143    }
144
145    pub fn meta_client(&self) -> Option<MetaClient> {
146        self.meta_client.clone()
147    }
148
149    pub fn client_pool(&self) -> ComputeClientPoolRef {
150        self.client_pool.clone()
151    }
152}