risingwave_stream/task/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use anyhow::anyhow;
18use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
19use risingwave_common::config::StreamingConfig;
20use risingwave_common::util::addr::HostAddr;
21use risingwave_pb::common::ActorInfo;
22use risingwave_rpc_client::ComputeClientPoolRef;
23
24use crate::error::StreamResult;
25use crate::executor::exchange::permit::{self, Receiver, Sender};
26
27mod barrier_manager;
28mod env;
29mod stream_manager;
30
31pub use barrier_manager::*;
32pub use env::*;
33use risingwave_common::catalog::DatabaseId;
34pub use stream_manager::*;
35
36pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
37pub type ActorId = u32;
38pub type FragmentId = u32;
39pub type DispatcherId = u64;
40pub type UpDownActorIds = (ActorId, ActorId);
41pub type UpDownFragmentIds = (FragmentId, FragmentId);
42
43#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
44pub(crate) struct PartialGraphId(u32);
45
46#[cfg(test)]
47pub(crate) const TEST_DATABASE_ID: risingwave_common::catalog::DatabaseId =
48    risingwave_common::catalog::DatabaseId::new(u32::MAX);
49
50#[cfg(test)]
51pub(crate) const TEST_PARTIAL_GRAPH_ID: PartialGraphId = PartialGraphId(u32::MAX);
52
53impl PartialGraphId {
54    fn new(id: u32) -> Self {
55        Self(id)
56    }
57}
58
59impl From<PartialGraphId> for u32 {
60    fn from(val: PartialGraphId) -> u32 {
61        val.0
62    }
63}
64
65/// Stores the information which may be modified from the data plane.
66///
67/// The data structure is created in `LocalBarrierWorker` and is shared by actors created
68/// between two recoveries. In every recovery, the `LocalBarrierWorker` will create a new instance of
69/// `SharedContext`, and the original one becomes stale. The new one is shared by actors created after
70/// recovery.
71pub struct SharedContext {
72    pub(crate) database_id: DatabaseId,
73    term_id: String,
74
75    /// Stores the senders and receivers for later `Processor`'s usage.
76    ///
77    /// Each actor has several senders and several receivers. Senders and receivers are created
78    /// during `update_actors` and stored in a channel map. Upon `build_actors`, all these channels
79    /// will be taken out and built into the executors and outputs.
80    /// One sender or one receiver can be uniquely determined by the upstream and downstream actor
81    /// id.
82    ///
83    /// There are three cases when we need local channels to pass around messages:
84    /// 1. pass `Message` between two local actors
85    /// 2. The RPC client at the downstream actor forwards received `Message` to one channel in
86    ///    `ReceiverExecutor` or `MergerExecutor`.
87    /// 3. The RPC `Output` at the upstream actor forwards received `Message` to
88    ///    `ExchangeServiceImpl`.
89    ///
90    /// The channel serves as a buffer because `ExchangeServiceImpl`
91    /// is on the server-side and we will also introduce backpressure.
92    channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,
93
94    /// Stores all actor information.
95    actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,
96
97    /// Stores the local address.
98    ///
99    /// It is used to test whether an actor is local or not,
100    /// thus determining whether we should setup local channel only or remote rpc connection
101    /// between two actors/actors.
102    pub(crate) addr: HostAddr,
103
104    /// Compute client pool for streaming gRPC exchange.
105    // TODO: currently the client pool won't be cleared. Should remove compute clients when
106    // disconnected.
107    pub(crate) compute_client_pool: ComputeClientPoolRef,
108
109    pub(crate) config: StreamingConfig,
110}
111
112impl std::fmt::Debug for SharedContext {
113    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114        f.debug_struct("SharedContext")
115            .field("addr", &self.addr)
116            .finish_non_exhaustive()
117    }
118}
119
120impl SharedContext {
121    pub fn new(database_id: DatabaseId, env: &StreamEnvironment, term_id: String) -> Self {
122        Self {
123            database_id,
124            term_id,
125            channel_map: Default::default(),
126            actor_infos: Default::default(),
127            addr: env.server_address().clone(),
128            config: env.config().as_ref().to_owned(),
129            compute_client_pool: env.client_pool(),
130        }
131    }
132
133    pub fn term_id(&self) -> String {
134        self.term_id.clone()
135    }
136
137    #[cfg(test)]
138    pub fn for_test() -> Self {
139        use std::sync::Arc;
140
141        use risingwave_common::config::StreamingDeveloperConfig;
142        use risingwave_rpc_client::ComputeClientPool;
143
144        Self {
145            database_id: TEST_DATABASE_ID,
146            term_id: "for_test".into(),
147            channel_map: Default::default(),
148            actor_infos: Default::default(),
149            addr: LOCAL_TEST_ADDR.clone(),
150            config: StreamingConfig {
151                developer: StreamingDeveloperConfig {
152                    exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
153                    exchange_batched_permits: permit::for_test::BATCHED_PERMITS,
154                    exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS,
155                    ..Default::default()
156                },
157                ..Default::default()
158            },
159            compute_client_pool: Arc::new(ComputeClientPool::for_test()),
160        }
161    }
162
163    /// Get the channel pair for the given actor ids. If the channel pair does not exist, create one
164    /// with the configured permits.
165    fn get_or_insert_channels(
166        &self,
167        ids: UpDownActorIds,
168    ) -> MappedMutexGuard<'_, ConsumableChannelPair> {
169        MutexGuard::map(self.channel_map.lock(), |map| {
170            map.entry(ids).or_insert_with(|| {
171                let (tx, rx) = permit::channel(
172                    self.config.developer.exchange_initial_permits,
173                    self.config.developer.exchange_batched_permits,
174                    self.config.developer.exchange_concurrent_barriers,
175                );
176                (Some(tx), Some(rx))
177            })
178        })
179    }
180
181    pub fn take_sender(&self, ids: &UpDownActorIds) -> StreamResult<Sender> {
182        self.get_or_insert_channels(*ids)
183            .0
184            .take()
185            .ok_or_else(|| anyhow!("sender for {ids:?} has already been taken").into())
186    }
187
188    pub fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult<Receiver> {
189        self.get_or_insert_channels(ids)
190            .1
191            .take()
192            .ok_or_else(|| anyhow!("receiver for {ids:?} has already been taken").into())
193    }
194
195    pub fn get_actor_info(&self, actor_id: &ActorId) -> StreamResult<ActorInfo> {
196        self.actor_infos
197            .read()
198            .get(actor_id)
199            .cloned()
200            .ok_or_else(|| anyhow!("actor {} not found in info table", actor_id).into())
201    }
202
203    pub fn config(&self) -> &StreamingConfig {
204        &self.config
205    }
206
207    pub(super) fn drop_actors(&self, actors: &HashSet<ActorId>) {
208        self.channel_map
209            .lock()
210            .retain(|(up_id, _), _| !actors.contains(up_id));
211        let mut actor_infos = self.actor_infos.write();
212        for actor_id in actors {
213            actor_infos.remove(actor_id);
214        }
215    }
216
217    pub(crate) fn add_actors(&self, new_actor_infos: impl Iterator<Item = ActorInfo>) {
218        let mut actor_infos = self.actor_infos.write();
219        for actor in new_actor_infos {
220            if let Some(prev_actor) = actor_infos.get(&actor.actor_id) {
221                if cfg!(debug_assertions) {
222                    panic!("duplicate actor info: {:?} {:?}", actor, actor_infos);
223                }
224                if prev_actor != &actor {
225                    warn!(
226                        ?prev_actor,
227                        ?actor,
228                        "add actor again but have different actor info. ignored"
229                    );
230                }
231            } else {
232                actor_infos.insert(actor.actor_id, actor);
233            }
234        }
235    }
236}