use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ops::AddAssign;
use itertools::Itertools;
use risingwave_common::catalog::TableId;
use risingwave_common::hash::{VirtualNode, WorkerSlotId};
use risingwave_common::util::stream_graph_visitor;
use risingwave_connector::source::SplitImpl;
use risingwave_meta_model::{SourceId, WorkerId};
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbActorLocation;
use risingwave_pb::meta::table_fragments::actor_status::ActorState;
use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State};
use risingwave_pb::meta::table_parallelism::{
FixedParallelism, Parallelism, PbAdaptiveParallelism, PbCustomParallelism, PbFixedParallelism,
PbParallelism,
};
use risingwave_pb::meta::{PbTableFragments, PbTableParallelism};
use risingwave_pb::plan_common::PbExprContext;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{FragmentTypeFlag, PbStreamContext, StreamActor, StreamNode};
use super::{ActorId, FragmentId};
use crate::model::MetadataModelResult;
use crate::stream::{build_actor_connector_splits, build_actor_split_impls, SplitAssignment};
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum TableParallelism {
Adaptive,
Fixed(usize),
Custom,
}
impl From<PbTableParallelism> for TableParallelism {
fn from(value: PbTableParallelism) -> Self {
use Parallelism::*;
match &value.parallelism {
Some(Fixed(FixedParallelism { parallelism: n })) => Self::Fixed(*n as usize),
Some(Adaptive(_)) | Some(Auto(_)) => Self::Adaptive,
Some(Custom(_)) => Self::Custom,
_ => unreachable!(),
}
}
}
impl From<TableParallelism> for PbTableParallelism {
fn from(value: TableParallelism) -> Self {
use TableParallelism::*;
let parallelism = match value {
Adaptive => PbParallelism::Adaptive(PbAdaptiveParallelism {}),
Fixed(n) => PbParallelism::Fixed(PbFixedParallelism {
parallelism: n as u32,
}),
Custom => PbParallelism::Custom(PbCustomParallelism {}),
};
Self {
parallelism: Some(parallelism),
}
}
}
#[derive(Debug, Clone)]
pub struct StreamJobFragments {
stream_job_id: TableId,
state: State,
pub fragments: BTreeMap<FragmentId, Fragment>,
pub actor_status: BTreeMap<ActorId, ActorStatus>,
pub actor_splits: HashMap<ActorId, Vec<SplitImpl>>,
pub ctx: StreamContext,
pub assigned_parallelism: TableParallelism,
pub max_parallelism: usize,
}
#[derive(Debug, Clone, Default)]
pub struct StreamContext {
pub timezone: Option<String>,
}
impl StreamContext {
pub fn to_protobuf(&self) -> PbStreamContext {
PbStreamContext {
timezone: self.timezone.clone().unwrap_or("".into()),
}
}
pub fn to_expr_context(&self) -> PbExprContext {
PbExprContext {
time_zone: self.timezone.clone().unwrap_or("Empty Time Zone".into()),
}
}
pub fn from_protobuf(prost: &PbStreamContext) -> Self {
Self {
timezone: if prost.get_timezone().is_empty() {
None
} else {
Some(prost.get_timezone().clone())
},
}
}
}
impl StreamJobFragments {
pub fn to_protobuf(&self) -> PbTableFragments {
PbTableFragments {
table_id: self.stream_job_id.table_id(),
state: self.state as _,
fragments: self.fragments.clone().into_iter().collect(),
actor_status: self.actor_status.clone().into_iter().collect(),
actor_splits: build_actor_connector_splits(&self.actor_splits),
ctx: Some(self.ctx.to_protobuf()),
parallelism: Some(self.assigned_parallelism.into()),
node_label: "".to_string(),
backfill_done: true,
max_parallelism: Some(self.max_parallelism as _),
}
}
pub fn from_protobuf(prost: PbTableFragments) -> Self {
let ctx = StreamContext::from_protobuf(prost.get_ctx().unwrap());
let default_parallelism = PbTableParallelism {
parallelism: Some(Parallelism::Custom(PbCustomParallelism {})),
};
let state = prost.state();
Self {
stream_job_id: TableId::new(prost.table_id),
state,
fragments: prost.fragments.into_iter().collect(),
actor_status: prost.actor_status.into_iter().collect(),
actor_splits: build_actor_split_impls(&prost.actor_splits),
ctx,
assigned_parallelism: prost.parallelism.unwrap_or(default_parallelism).into(),
max_parallelism: prost
.max_parallelism
.map_or(VirtualNode::COUNT_FOR_COMPAT, |v| v as _),
}
}
}
impl StreamJobFragments {
pub fn for_test(table_id: TableId, fragments: BTreeMap<FragmentId, Fragment>) -> Self {
Self::new(
table_id,
fragments,
&BTreeMap::new(),
StreamContext::default(),
TableParallelism::Adaptive,
VirtualNode::COUNT_FOR_TEST,
)
}
pub fn new(
stream_job_id: TableId,
fragments: BTreeMap<FragmentId, Fragment>,
actor_locations: &BTreeMap<ActorId, WorkerSlotId>,
ctx: StreamContext,
table_parallelism: TableParallelism,
max_parallelism: usize,
) -> Self {
let actor_status = actor_locations
.iter()
.map(|(&actor_id, worker_slot_id)| {
(
actor_id,
ActorStatus {
location: PbActorLocation::from_worker(worker_slot_id.worker_id()),
state: ActorState::Inactive as i32,
},
)
})
.collect();
Self {
stream_job_id,
state: State::Initial,
fragments,
actor_status,
actor_splits: HashMap::default(),
ctx,
assigned_parallelism: table_parallelism,
max_parallelism,
}
}
pub fn fragment_ids(&self) -> impl Iterator<Item = FragmentId> + '_ {
self.fragments.keys().cloned()
}
pub fn fragments(&self) -> impl Iterator<Item = &Fragment> {
self.fragments.values()
}
pub fn stream_job_id(&self) -> TableId {
self.stream_job_id
}
pub fn state(&self) -> State {
self.state
}
pub fn timezone(&self) -> Option<String> {
self.ctx.timezone.clone()
}
pub fn is_created(&self) -> bool {
self.state == State::Created
}
pub fn is_initial(&self) -> bool {
self.state == State::Initial
}
pub fn set_state(&mut self, state: State) {
self.state = state;
}
pub fn update_actors_state(&mut self, state: ActorState) {
for actor_status in self.actor_status.values_mut() {
actor_status.set_state(state);
}
}
pub fn set_actor_splits_by_split_assignment(&mut self, split_assignment: SplitAssignment) {
self.actor_splits = split_assignment.into_values().flatten().collect();
}
pub fn actor_ids(&self) -> Vec<ActorId> {
self.fragments
.values()
.flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
.collect()
}
pub fn actor_fragment_mapping(&self) -> HashMap<ActorId, FragmentId> {
self.fragments
.values()
.flat_map(|fragment| {
fragment
.actors
.iter()
.map(|actor| (actor.actor_id, fragment.fragment_id))
})
.collect()
}
pub fn actors(&self) -> Vec<StreamActor> {
self.fragments
.values()
.flat_map(|fragment| fragment.actors.clone())
.collect()
}
pub fn filter_actor_ids(
&self,
check_type: impl Fn(u32) -> bool + 'static,
) -> impl Iterator<Item = ActorId> + '_ {
self.fragments
.values()
.filter(move |fragment| check_type(fragment.get_fragment_type_mask()))
.flat_map(|fragment| fragment.actors.iter().map(|actor| actor.actor_id))
}
pub fn mview_actor_ids(&self) -> Vec<ActorId> {
Self::filter_actor_ids(self, |fragment_type_mask| {
(fragment_type_mask & FragmentTypeFlag::Mview as u32) != 0
})
.collect()
}
pub fn tracking_progress_actor_ids(&self) -> Vec<(ActorId, BackfillUpstreamType)> {
let mut actor_ids = vec![];
for fragment in self.fragments.values() {
if fragment.fragment_type_mask & FragmentTypeFlag::CdcFilter as u32 != 0 {
return vec![];
}
if (fragment.fragment_type_mask
& (FragmentTypeFlag::Values as u32
| FragmentTypeFlag::StreamScan as u32
| FragmentTypeFlag::SourceScan as u32))
!= 0
{
actor_ids.extend(fragment.actors.iter().map(|actor| {
(
actor.actor_id,
BackfillUpstreamType::from_fragment_type_mask(fragment.fragment_type_mask),
)
}));
}
}
actor_ids
}
pub fn mview_fragment(&self) -> Option<Fragment> {
self.fragments
.values()
.find(|fragment| {
(fragment.get_fragment_type_mask() & FragmentTypeFlag::Mview as u32) != 0
})
.cloned()
}
pub fn source_fragment(&self) -> Option<Fragment> {
self.fragments
.values()
.find(|fragment| {
(fragment.get_fragment_type_mask() & FragmentTypeFlag::Source as u32) != 0
})
.cloned()
}
pub fn sink_fragment(&self) -> Option<Fragment> {
self.fragments
.values()
.find(|fragment| {
(fragment.get_fragment_type_mask() & FragmentTypeFlag::Sink as u32) != 0
})
.cloned()
}
pub fn snapshot_backfill_actor_ids(&self) -> HashSet<ActorId> {
Self::filter_actor_ids(self, |mask| {
(mask & FragmentTypeFlag::SnapshotBackfillStreamScan as u32) != 0
})
.collect()
}
pub fn stream_source_fragments(&self) -> HashMap<SourceId, BTreeSet<FragmentId>> {
let mut source_fragments = HashMap::new();
for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some(source_id) = actor.nodes.as_ref().unwrap().find_stream_source() {
source_fragments
.entry(source_id as SourceId)
.or_insert(BTreeSet::new())
.insert(fragment.fragment_id as FragmentId);
break;
}
}
}
source_fragments
}
pub fn source_backfill_fragments(
&self,
) -> MetadataModelResult<HashMap<SourceId, BTreeSet<(FragmentId, FragmentId)>>> {
let mut source_fragments = HashMap::new();
for fragment in self.fragments() {
for actor in &fragment.actors {
if let Some(source_id) = actor.nodes.as_ref().unwrap().find_source_backfill() {
if fragment.upstream_fragment_ids.len() != 1 {
return Err(anyhow::anyhow!("SourceBackfill should have only one upstream fragment, found {:?} for fragment {}", fragment.upstream_fragment_ids, fragment.fragment_id).into());
}
source_fragments
.entry(source_id as SourceId)
.or_insert(BTreeSet::new())
.insert((fragment.fragment_id, fragment.upstream_fragment_ids[0]));
break;
}
}
}
Ok(source_fragments)
}
fn resolve_dependent_table(stream_node: &StreamNode, table_ids: &mut HashMap<TableId, usize>) {
let table_id = match stream_node.node_body.as_ref() {
Some(NodeBody::StreamScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
Some(NodeBody::StreamCdcScan(stream_scan)) => Some(TableId::new(stream_scan.table_id)),
_ => None,
};
if let Some(table_id) = table_id {
table_ids.entry(table_id).or_default().add_assign(1);
}
for child in &stream_node.input {
Self::resolve_dependent_table(child, table_ids);
}
}
pub fn dependent_table_ids(&self) -> HashMap<TableId, usize> {
let mut table_ids = HashMap::new();
self.fragments.values().for_each(|fragment| {
let actor = &fragment.actors[0];
Self::resolve_dependent_table(actor.nodes.as_ref().unwrap(), &mut table_ids);
});
table_ids
}
pub fn worker_actor_states(&self) -> BTreeMap<WorkerId, Vec<(ActorId, ActorState)>> {
let mut map = BTreeMap::default();
for (&actor_id, actor_status) in &self.actor_status {
let node_id = actor_status.worker_id() as WorkerId;
map.entry(node_id)
.or_insert_with(Vec::new)
.push((actor_id, actor_status.state()));
}
map
}
pub fn worker_actor_ids(&self) -> BTreeMap<WorkerId, Vec<ActorId>> {
let mut map = BTreeMap::default();
for (&actor_id, actor_status) in &self.actor_status {
let node_id = actor_status.worker_id() as WorkerId;
map.entry(node_id).or_insert_with(Vec::new).push(actor_id);
}
map
}
pub fn active_actors(&self) -> Vec<StreamActor> {
let mut actors = vec![];
for fragment in self.fragments.values() {
for actor in &fragment.actors {
if self.actor_status[&actor.actor_id].state == ActorState::Inactive as i32 {
continue;
}
actors.push(actor.clone());
}
}
actors
}
pub fn actors_to_create(&self) -> HashMap<WorkerId, Vec<StreamActor>> {
let mut actor_map: HashMap<_, Vec<_>> = HashMap::new();
self.fragments
.values()
.flat_map(|fragment| fragment.actors.iter())
.for_each(|actor| {
let worker_id = self
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.worker_id() as WorkerId;
actor_map.entry(worker_id).or_default().push(actor.clone());
});
actor_map
}
pub fn mv_table_id(&self) -> Option<u32> {
if self
.fragments
.values()
.flat_map(|f| f.state_table_ids.iter())
.any(|table_id| *table_id == self.stream_job_id.table_id)
{
Some(self.stream_job_id.table_id)
} else {
None
}
}
pub fn internal_tables(&self) -> BTreeMap<u32, Table> {
self.collect_tables_inner(true)
}
pub fn all_tables(&self) -> BTreeMap<u32, Table> {
self.collect_tables_inner(false)
}
fn collect_tables_inner(&self, internal_tables_only: bool) -> BTreeMap<u32, Table> {
let mut tables = BTreeMap::new();
for fragment in self.fragments.values() {
stream_graph_visitor::visit_stream_node_tables_inner(
&mut fragment.actors[0].nodes.clone().unwrap(),
internal_tables_only,
true,
|table, _| {
let table_id = table.id;
tables
.try_insert(table_id, table.clone())
.unwrap_or_else(|_| panic!("duplicated table id `{}`", table_id));
},
);
}
tables
}
pub fn internal_table_ids(&self) -> Vec<u32> {
self.fragments
.values()
.flat_map(|f| f.state_table_ids.clone())
.filter(|&t| t != self.stream_job_id.table_id)
.collect_vec()
}
pub fn all_table_ids(&self) -> impl Iterator<Item = u32> + '_ {
self.fragments
.values()
.flat_map(|f| f.state_table_ids.clone())
}
pub fn fill_expr_context(mut self) -> Self {
self.fragments.values_mut().for_each(|fragment| {
fragment.actors.iter_mut().for_each(|actor| {
if actor.expr_context.is_none() {
actor.expr_context = Some(self.ctx.to_expr_context());
}
});
});
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackfillUpstreamType {
MView,
Values,
Source,
}
impl BackfillUpstreamType {
pub fn from_fragment_type_mask(mask: u32) -> Self {
let is_mview = (mask & FragmentTypeFlag::StreamScan as u32) != 0;
let is_values = (mask & FragmentTypeFlag::Values as u32) != 0;
let is_source = (mask & FragmentTypeFlag::SourceScan as u32) != 0;
debug_assert!(
is_mview as u8 + is_values as u8 + is_source as u8 == 1,
"a backfill fragment should either be mview, value or source, found {:?}",
mask
);
if is_mview {
BackfillUpstreamType::MView
} else if is_values {
BackfillUpstreamType::Values
} else if is_source {
BackfillUpstreamType::Source
} else {
unreachable!("invalid fragment type mask: {}", mask);
}
}
}