risingwave_stream/task/
mod.rsuse std::collections::{HashMap, HashSet};
use anyhow::anyhow;
use parking_lot::{MappedMutexGuard, Mutex, MutexGuard, RwLock};
use risingwave_common::config::StreamingConfig;
use risingwave_common::util::addr::HostAddr;
use risingwave_pb::common::ActorInfo;
use risingwave_rpc_client::ComputeClientPoolRef;
use crate::error::StreamResult;
use crate::executor::exchange::permit::{self, Receiver, Sender};
mod barrier_manager;
mod env;
mod stream_manager;
pub use barrier_manager::*;
pub use env::*;
use risingwave_common::catalog::DatabaseId;
pub use stream_manager::*;
pub type ConsumableChannelPair = (Option<Sender>, Option<Receiver>);
pub type ActorId = u32;
pub type FragmentId = u32;
pub type DispatcherId = u64;
pub type UpDownActorIds = (ActorId, ActorId);
pub type UpDownFragmentIds = (FragmentId, FragmentId);
#[derive(Hash, Eq, PartialEq, Copy, Clone, Debug)]
pub(crate) struct PartialGraphId(u32);
#[cfg(test)]
pub(crate) const TEST_DATABASE_ID: risingwave_common::catalog::DatabaseId =
risingwave_common::catalog::DatabaseId::new(u32::MAX);
#[cfg(test)]
pub(crate) const TEST_PARTIAL_GRAPH_ID: PartialGraphId = PartialGraphId(u32::MAX);
impl PartialGraphId {
fn new(id: u32) -> Self {
Self(id)
}
}
impl From<PartialGraphId> for u32 {
fn from(val: PartialGraphId) -> u32 {
val.0
}
}
pub struct SharedContext {
pub(crate) database_id: DatabaseId,
channel_map: Mutex<HashMap<UpDownActorIds, ConsumableChannelPair>>,
actor_infos: RwLock<HashMap<ActorId, ActorInfo>>,
pub(crate) addr: HostAddr,
pub(crate) compute_client_pool: ComputeClientPoolRef,
pub(crate) config: StreamingConfig,
}
impl std::fmt::Debug for SharedContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SharedContext")
.field("addr", &self.addr)
.finish_non_exhaustive()
}
}
impl SharedContext {
pub fn new(database_id: DatabaseId, env: &StreamEnvironment) -> Self {
Self {
database_id,
channel_map: Default::default(),
actor_infos: Default::default(),
addr: env.server_address().clone(),
config: env.config().as_ref().to_owned(),
compute_client_pool: env.client_pool(),
}
}
#[cfg(test)]
pub fn for_test() -> Self {
use std::sync::Arc;
use risingwave_common::config::StreamingDeveloperConfig;
use risingwave_rpc_client::ComputeClientPool;
Self {
database_id: TEST_DATABASE_ID,
channel_map: Default::default(),
actor_infos: Default::default(),
addr: LOCAL_TEST_ADDR.clone(),
config: StreamingConfig {
developer: StreamingDeveloperConfig {
exchange_initial_permits: permit::for_test::INITIAL_PERMITS,
exchange_batched_permits: permit::for_test::BATCHED_PERMITS,
exchange_concurrent_barriers: permit::for_test::CONCURRENT_BARRIERS,
..Default::default()
},
..Default::default()
},
compute_client_pool: Arc::new(ComputeClientPool::for_test()),
}
}
fn get_or_insert_channels(
&self,
ids: UpDownActorIds,
) -> MappedMutexGuard<'_, ConsumableChannelPair> {
MutexGuard::map(self.channel_map.lock(), |map| {
map.entry(ids).or_insert_with(|| {
let (tx, rx) = permit::channel(
self.config.developer.exchange_initial_permits,
self.config.developer.exchange_batched_permits,
self.config.developer.exchange_concurrent_barriers,
);
(Some(tx), Some(rx))
})
})
}
pub fn take_sender(&self, ids: &UpDownActorIds) -> StreamResult<Sender> {
self.get_or_insert_channels(*ids)
.0
.take()
.ok_or_else(|| anyhow!("sender for {ids:?} has already been taken").into())
}
pub fn take_receiver(&self, ids: UpDownActorIds) -> StreamResult<Receiver> {
self.get_or_insert_channels(ids)
.1
.take()
.ok_or_else(|| anyhow!("receiver for {ids:?} has already been taken").into())
}
pub fn get_actor_info(&self, actor_id: &ActorId) -> StreamResult<ActorInfo> {
self.actor_infos
.read()
.get(actor_id)
.cloned()
.ok_or_else(|| anyhow!("actor {} not found in info table", actor_id).into())
}
pub fn config(&self) -> &StreamingConfig {
&self.config
}
pub(super) fn drop_actors(&self, actors: &HashSet<ActorId>) {
self.channel_map
.lock()
.retain(|(up_id, _), _| !actors.contains(up_id));
let mut actor_infos = self.actor_infos.write();
for actor_id in actors {
actor_infos.remove(actor_id);
}
}
pub(crate) fn add_actors(&self, new_actor_infos: impl Iterator<Item = ActorInfo>) {
let mut actor_infos = self.actor_infos.write();
for actor in new_actor_infos {
if let Some(prev_actor) = actor_infos.get(&actor.get_actor_id()) {
if cfg!(debug_assertions) {
panic!("duplicate actor info: {:?} {:?}", actor, actor_infos);
}
if prev_actor != &actor {
warn!(
?prev_actor,
?actor,
"add actor again but have different actor info. ignored"
);
}
} else {
actor_infos.insert(actor.get_actor_id(), actor);
}
}
}
}
pub fn unique_executor_id(actor_id: u32, operator_id: u64) -> u64 {
assert!(operator_id <= u32::MAX as u64);
((actor_id as u64) << 32) + operator_id
}
pub fn unique_operator_id(fragment_id: u32, operator_id: u64) -> u64 {
assert!(operator_id <= u32::MAX as u64);
((fragment_id as u64) << 32) + operator_id
}