risingwave_stream/task/
env.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use hytra::TrAdder;
18use risingwave_common::config::StreamingConfig;
19pub(crate) use risingwave_common::id::WorkerId as WorkerNodeId;
20use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
21use risingwave_common::util::addr::HostAddr;
22use risingwave_connector::source::monitor::SourceMetrics;
23use risingwave_dml::dml_manager::DmlManagerRef;
24use risingwave_rpc_client::{ComputeClientPoolRef, MetaClient};
25use risingwave_storage::StateStoreImpl;
26
27/// The global environment for task execution.
28/// The instance will be shared by every task.
29#[derive(Clone, Debug)]
30pub struct StreamEnvironment {
31    /// Endpoint the stream manager listens on.
32    server_addr: HostAddr,
33
34    /// Streaming related configurations.
35    ///
36    /// This is the global config for the whole compute node. Actor may have its config overridden.
37    /// In executor, use `actor_context.config` instead.
38    global_config: Arc<StreamingConfig>,
39
40    /// Current worker node id.
41    worker_id: WorkerNodeId,
42
43    /// State store for table scanning.
44    state_store: StateStoreImpl,
45
46    /// Manages dml information.
47    dml_manager: DmlManagerRef,
48
49    /// Read the latest system parameters.
50    system_params_manager: LocalSystemParamsManagerRef,
51
52    /// Metrics for source.
53    source_metrics: Arc<SourceMetrics>,
54
55    /// Total memory usage in stream.
56    total_mem_val: Arc<TrAdder<i64>>,
57
58    /// Meta client. Use `None` for test only
59    meta_client: Option<MetaClient>,
60
61    /// Compute client pool for streaming gRPC exchange.
62    client_pool: ComputeClientPoolRef,
63}
64
65impl StreamEnvironment {
66    #[allow(clippy::too_many_arguments)]
67    pub fn new(
68        server_addr: HostAddr,
69        global_config: Arc<StreamingConfig>,
70        worker_id: WorkerNodeId,
71        state_store: StateStoreImpl,
72        dml_manager: DmlManagerRef,
73        system_params_manager: LocalSystemParamsManagerRef,
74        source_metrics: Arc<SourceMetrics>,
75        meta_client: MetaClient,
76        client_pool: ComputeClientPoolRef,
77    ) -> Self {
78        StreamEnvironment {
79            server_addr,
80            global_config,
81            worker_id,
82            state_store,
83            dml_manager,
84            system_params_manager,
85            source_metrics,
86            total_mem_val: Arc::new(TrAdder::new()),
87            meta_client: Some(meta_client),
88            client_pool,
89        }
90    }
91
92    // Create an instance for testing purpose.
93    pub fn for_test() -> Self {
94        use risingwave_common::system_param::local_manager::LocalSystemParamsManager;
95        use risingwave_dml::dml_manager::DmlManager;
96        use risingwave_rpc_client::ComputeClientPool;
97        use risingwave_storage::monitor::MonitoredStorageMetrics;
98        StreamEnvironment {
99            server_addr: "127.0.0.1:2333".parse().unwrap(),
100            global_config: Arc::new(StreamingConfig::default()),
101            worker_id: WorkerNodeId::default(),
102            state_store: StateStoreImpl::shared_in_memory_store(Arc::new(
103                MonitoredStorageMetrics::unused(),
104            )),
105            dml_manager: Arc::new(DmlManager::for_test()),
106            system_params_manager: Arc::new(LocalSystemParamsManager::for_test()),
107            source_metrics: Arc::new(SourceMetrics::default()),
108            total_mem_val: Arc::new(TrAdder::new()),
109            meta_client: None,
110            client_pool: Arc::new(ComputeClientPool::for_test()),
111        }
112    }
113
114    pub fn server_address(&self) -> &HostAddr {
115        &self.server_addr
116    }
117
118    pub fn global_config(&self) -> &Arc<StreamingConfig> {
119        &self.global_config
120    }
121
122    pub fn worker_id(&self) -> WorkerNodeId {
123        self.worker_id
124    }
125
126    pub fn state_store(&self) -> StateStoreImpl {
127        self.state_store.clone()
128    }
129
130    pub fn dml_manager_ref(&self) -> DmlManagerRef {
131        self.dml_manager.clone()
132    }
133
134    pub fn system_params_manager_ref(&self) -> LocalSystemParamsManagerRef {
135        self.system_params_manager.clone()
136    }
137
138    pub fn source_metrics(&self) -> Arc<SourceMetrics> {
139        self.source_metrics.clone()
140    }
141
142    pub fn total_mem_usage(&self) -> Arc<TrAdder<i64>> {
143        self.total_mem_val.clone()
144    }
145
146    pub fn meta_client(&self) -> Option<MetaClient> {
147        self.meta_client.clone()
148    }
149
150    pub fn client_pool(&self) -> ComputeClientPoolRef {
151        self.client_pool.clone()
152    }
153}