risingwave_stream/task/mod.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213
// Copyright 2024 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{HashMap, HashSet};
use anyhow::anyhow;
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::ActorInfo;
use risingwave_rpc_client::ComputeClientPoolRef;
use crate::error::StreamResult;
use crate::executor::exchange::permit::{self, Receiver, Sender};
mod barrier_manager;
mod env;
mod stream_manager;
pub use barrier_manager::*;
pub use env::*;
pub use stream_manager::*;
pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
pub type ActorId = u32;
pub type FragmentId = u32;
pub type DispatcherId = u64;
pub type UpDownActorIds = (ActorId, ActorId);
pub type UpDownFragmentIds = (FragmentId, FragmentId);
#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
struct PartialGraphId(u64);
impl PartialGraphId {
fn new(id: u64) -> Self {
Self(id)
}
}
impl From<PartialGraphId> for u64 {
fn from(val: PartialGraphId) -> u64 {
val.0
}
}
/// Stores the information which may be modified from the data plane.
///
/// The data structure is created in `LocalBarrierWorker` and is shared by actors created
/// between two recoveries. In every recovery, the `LocalBarrierWorker` will create a new instance of
/// `SharedContext`, and the original one becomes stale. The new one is shared by actors created after
/// recovery.
pub struct SharedContext {
/// Stores the senders and receivers for later `Processor`'s usage.
///
/// Each actor has several senders and several receivers. Senders and receivers are created
/// during `update_actors` and stored in a channel map. Upon `build_actors`, all these channels
/// will be taken out and built into the executors and outputs.
/// One sender or one receiver can be uniquely determined by the upstream and downstream actor
/// id.
///
/// There are three cases when we need local channels to pass around messages:
/// 1. pass `Message` between two local actors
/// 2. The RPC client at the downstream actor forwards received `Message` to one channel in
/// `ReceiverExecutor` or `MergerExecutor`.
/// 3. The RPC `Output` at the upstream actor forwards received `Message` to
/// `ExchangeServiceImpl`.
///
/// The channel serves as a buffer because `ExchangeServiceImpl`
/// is on the server-side and we will also introduce backpressure.
pub(crate) channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,
/// Stores all actor information.
pub(crate) actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,
/// Stores the local address.
///
/// It is used to test whether an actor is local or not,
/// thus determining whether we should setup local channel only or remote rpc connection
/// between two actors/actors.
pub(crate) addr: HostAddr,
/// Compute client pool for streaming gRPC exchange.
// TODO: currently the client pool won't be cleared. Should remove compute clients when
// disconnected.
pub(crate) compute_client_pool: ComputeClientPoolRef,
pub(crate) config: StreamingConfig,
pub(super) local_barrier_manager: LocalBarrierManager,
}
impl std::fmt::Debug for SharedContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedContext")
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}
impl SharedContext {
pub fn new(env: &StreamEnvironment, local_barrier_manager: LocalBarrierManager) -> Self {
Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr: env.server_address().clone(),
config: env.config().as_ref().to_owned(),
compute_client_pool: env.client_pool(),
local_barrier_manager,
}
}
#[cfg(test)]
pub fn for_test() -> Self {
use std::sync::Arc;
use risingwave_common::config::StreamingDeveloperConfig;
use risingwave_rpc_client::ComputeClientPool;
Self {
channel_map: Default::default(),
actor_infos: Default::default(),
addr: LOCAL_TEST_ADDR.clone(),
config: StreamingConfig {
developer: StreamingDeveloperConfig {
exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
exchange_batched_permits: permit::for_test::BATCHED_PERMITS,
exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS,
..Default::default()
},
..Default::default()
},
compute_client_pool: Arc::new(ComputeClientPool::for_test()),
local_barrier_manager: LocalBarrierManager::for_test(),
}
}
/// Get the channel pair for the given actor ids. If the channel pair does not exist, create one
/// with the configured permits.
fn get_or_insert_channels(
&self,
ids: UpDownActorIds,
) -> MappedMutexGuard<'_, ConsumableChannelPair> {
MutexGuard::map(self.channel_map.lock(), |map| {
map.entry(ids).or_insert_with(|| {
let (tx, rx) = permit::channel(
self.config.developer.exchange_initial_permits,
self.config.developer.exchange_batched_permits,
self.config.developer.exchange_concurrent_barriers,
);
(Some(tx), Some(rx))
})
})
}
pub fn take_sender(&self, ids: &UpDownActorIds) -> StreamResult<Sender> {
self.get_or_insert_channels(*ids)
.0
.take()
.ok_or_else(|| anyhow!("sender for {ids:?} has already been taken").into())
}
pub fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult<Receiver> {
self.get_or_insert_channels(ids)
.1
.take()
.ok_or_else(|| anyhow!("receiver for {ids:?} has already been taken").into())
}
pub fn get_actor_info(&self, actor_id: &ActorId) -> StreamResult<ActorInfo> {
self.actor_infos
.read()
.get(actor_id)
.cloned()
.ok_or_else(|| anyhow!("actor {} not found in info table", actor_id).into())
}
pub fn config(&self) -> &StreamingConfig {
&self.config
}
pub(super) fn drop_actors(&self, actors: &HashSet<ActorId>) {
self.channel_map
.lock()
.retain(|(up_id, _), _| !actors.contains(up_id));
let mut actor_infos = self.actor_infos.write();
for actor_id in actors {
actor_infos.remove(actor_id);
}
}
}
/// Generate a globally unique executor id.
pub fn unique_executor_id(actor_id: u32, operator_id: u64) -> u64 {
assert!(operator_id <= u32::MAX as u64);
((actor_id as u64) << 32) + operator_id
}
/// Generate a globally unique operator id.
pub fn unique_operator_id(fragment_id: u32, operator_id: u64) -> u64 {
assert!(operator_id <= u32::MAX as u64);
((fragment_id as u64) << 32) + operator_id
}