use std::collections::HashSet;
use risingwave_common::catalog::TableVersionId;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::{bail_not_implemented, current_cluster_version};
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::prelude::{SourceModel, TableModel};
use risingwave_meta_model::{source, table, SourceId, TableId, TableVersion};
use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
use risingwave_pb::ddl_service::TableJobType;
use sea_orm::entity::prelude::*;
use sea_orm::{DatabaseTransaction, QuerySelect};
use strum::{EnumIs, EnumTryAs};
use super::{
get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
};
use crate::stream::StreamFragmentGraph;
use crate::{MetaError, MetaResult};
#[derive(Debug, Clone, EnumIs, EnumTryAs)]
pub enum StreamingJob {
MaterializedView(Table),
Sink(Sink, Option<(Table, Option<PbSource>)>),
Table(Option<PbSource>, Table, TableJobType),
Index(Index, Table),
Source(PbSource),
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum StreamingJobType {
MaterializedView,
Sink,
Table(TableJobType),
Index,
Source,
}
impl From<&StreamingJob> for StreamingJobType {
fn from(job: &StreamingJob) -> Self {
match job {
StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
StreamingJob::Sink(_, _) => StreamingJobType::Sink,
StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
StreamingJob::Index(_, _) => StreamingJobType::Index,
StreamingJob::Source(_) => StreamingJobType::Source,
}
}
}
#[cfg(test)]
#[allow(clippy::derivable_impls)]
impl Default for StreamingJobType {
fn default() -> Self {
StreamingJobType::MaterializedView
}
}
impl StreamingJob {
pub fn mark_created(&mut self) {
let created_at_epoch = Some(Epoch::now().0);
let created_at_cluster_version = Some(current_cluster_version());
match self {
StreamingJob::MaterializedView(table) => {
table.created_at_epoch = created_at_epoch;
table.created_at_cluster_version = created_at_cluster_version;
}
StreamingJob::Sink(table, _) => table.created_at_epoch = created_at_epoch,
StreamingJob::Table(source, table, ..) => {
table.created_at_epoch = created_at_epoch;
table
.created_at_cluster_version
.clone_from(&created_at_cluster_version);
if let Some(source) = source {
source.created_at_epoch = created_at_epoch;
source.created_at_cluster_version = created_at_cluster_version;
}
}
StreamingJob::Index(index, _) => {
index.created_at_epoch = created_at_epoch;
index.created_at_cluster_version = created_at_cluster_version;
}
StreamingJob::Source(source) => {
source.created_at_epoch = created_at_epoch;
source.created_at_cluster_version = created_at_cluster_version;
}
}
}
pub fn mark_initialized(&mut self) {
let initialized_at_epoch = Some(Epoch::now().0);
let initialized_at_cluster_version = Some(current_cluster_version());
match self {
StreamingJob::MaterializedView(table) => {
table.initialized_at_epoch = initialized_at_epoch;
table.initialized_at_cluster_version = initialized_at_cluster_version;
}
StreamingJob::Sink(table, _) => {
table.initialized_at_epoch = initialized_at_epoch;
table.initialized_at_cluster_version = initialized_at_cluster_version;
}
StreamingJob::Table(source, table, ..) => {
table.initialized_at_epoch = initialized_at_epoch;
table
.initialized_at_cluster_version
.clone_from(&initialized_at_cluster_version);
if let Some(source) = source {
source.initialized_at_epoch = initialized_at_epoch;
source.initialized_at_cluster_version = initialized_at_cluster_version;
}
}
StreamingJob::Index(index, _) => {
index.initialized_at_epoch = initialized_at_epoch;
index.initialized_at_cluster_version = initialized_at_cluster_version;
}
StreamingJob::Source(source) => {
source.initialized_at_epoch = initialized_at_epoch;
source.initialized_at_cluster_version = initialized_at_cluster_version;
}
}
}
}
impl StreamingJob {
pub fn set_id(&mut self, id: u32) {
match self {
Self::MaterializedView(table) => table.id = id,
Self::Sink(sink, _) => sink.id = id,
Self::Table(_, table, ..) => table.id = id,
Self::Index(index, index_table) => {
index.id = id;
index.index_table_id = id;
index_table.id = id;
}
StreamingJob::Source(src) => {
src.id = id;
}
}
}
pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
match self {
Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
table.maybe_vnode_count = Some(vnode_count as u32);
}
Self::Sink(_, _) | Self::Source(_) => {}
}
}
pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
match self {
Self::Table(_, table, ..) => {
table.fragment_id = graph.table_fragment_id();
table.dml_fragment_id = graph.dml_fragment_id();
}
Self::MaterializedView(table) | Self::Index(_, table) => {
table.fragment_id = graph.table_fragment_id();
}
Self::Sink(_, _) | Self::Source(_) => {}
}
}
pub fn id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.id,
Self::Sink(sink, _) => sink.id,
Self::Table(_, table, ..) => table.id,
Self::Index(index, _) => index.id,
Self::Source(source) => source.id,
}
}
pub fn mv_table(&self) -> Option<u32> {
match self {
Self::MaterializedView(table) => Some(table.id),
Self::Sink(_, _) => None,
Self::Table(_, table, ..) => Some(table.id),
Self::Index(_, table) => Some(table.id),
Self::Source(_) => None,
}
}
pub fn table(&self) -> Option<&Table> {
match self {
Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
Some(table)
}
Self::Sink(_, _) | Self::Source(_) => None,
}
}
pub fn schema_id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.schema_id,
Self::Sink(sink, _) => sink.schema_id,
Self::Table(_, table, ..) => table.schema_id,
Self::Index(index, _) => index.schema_id,
Self::Source(source) => source.schema_id,
}
}
pub fn database_id(&self) -> u32 {
match self {
Self::MaterializedView(table) => table.database_id,
Self::Sink(sink, _) => sink.database_id,
Self::Table(_, table, ..) => table.database_id,
Self::Index(index, _) => index.database_id,
Self::Source(source) => source.database_id,
}
}
pub fn name(&self) -> String {
match self {
Self::MaterializedView(table) => table.name.clone(),
Self::Sink(sink, _) => sink.name.clone(),
Self::Table(_, table, ..) => table.name.clone(),
Self::Index(index, _) => index.name.clone(),
Self::Source(source) => source.name.clone(),
}
}
pub fn owner(&self) -> u32 {
match self {
StreamingJob::MaterializedView(mv) => mv.owner,
StreamingJob::Sink(sink, _) => sink.owner,
StreamingJob::Table(_, table, ..) => table.owner,
StreamingJob::Index(index, _) => index.owner,
StreamingJob::Source(source) => source.owner,
}
}
pub fn job_type(&self) -> StreamingJobType {
self.into()
}
pub fn job_type_str(&self) -> &'static str {
match self {
StreamingJob::MaterializedView(_) => "materialized view",
StreamingJob::Sink(_, _) => "sink",
StreamingJob::Table(_, _, _) => "table",
StreamingJob::Index(_, _) => "index",
StreamingJob::Source(_) => "source",
}
}
pub fn definition(&self) -> String {
match self {
Self::MaterializedView(table) => table.definition.clone(),
Self::Table(_, table, ..) => table.definition.clone(),
Self::Index(_, table) => table.definition.clone(),
Self::Sink(sink, _) => sink.definition.clone(),
Self::Source(source) => source.definition.clone(),
}
}
pub fn object_type(&self) -> ObjectType {
match self {
Self::MaterializedView(_) => ObjectType::Table, Self::Sink(_, _) => ObjectType::Sink,
Self::Table(_, _, _) => ObjectType::Table,
Self::Index(_, _) => ObjectType::Index,
Self::Source(_) => ObjectType::Source,
}
}
pub fn table_version_id(&self) -> Option<TableVersionId> {
if let Self::Table(_, table, ..) = self {
Some(
table
.get_version()
.expect("table must be versioned")
.version,
)
} else {
None
}
}
pub fn create_type(&self) -> CreateType {
match self {
Self::MaterializedView(table) => {
table.get_create_type().unwrap_or(CreateType::Foreground)
}
Self::Sink(s, _) => s.get_create_type().unwrap_or(CreateType::Foreground),
_ => CreateType::Foreground,
}
}
pub fn dependent_relations(&self) -> Vec<u32> {
match self {
StreamingJob::MaterializedView(table) => table.dependent_relations.clone(),
StreamingJob::Sink(_sink, _) => vec![], StreamingJob::Table(_, table, _) => table.dependent_relations.clone(), StreamingJob::Index(index, index_table) => {
assert_eq!(index.primary_table_id, index_table.dependent_relations[0]);
vec![]
}
StreamingJob::Source(_) => vec![],
}
}
pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
match self {
StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
StreamingJob::Table(source, _, _) => {
if let Some(source) = source {
Ok(get_referred_connection_ids_from_source(source))
} else {
Ok(HashSet::new())
}
}
StreamingJob::Sink(sink, _) => Ok(get_referred_connection_ids_from_sink(sink)),
StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
}
}
pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
match self {
StreamingJob::Sink(sink, _) => Ok(get_referred_secret_ids_from_sink(sink)),
StreamingJob::Table(source, _, _) => {
if let Some(source) = source {
get_referred_secret_ids_from_source(source)
} else {
Ok(HashSet::new())
}
}
StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
}
}
pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
let id = self.id();
match self {
StreamingJob::Table(_source, table, _table_job_type) => {
let new_version = table.get_version()?.get_version();
let original_version: Option<TableVersion> = TableModel::find_by_id(id as TableId)
.select_only()
.column(table::Column::Version)
.into_tuple()
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
let original_version = original_version
.expect("version for table should exist")
.to_protobuf();
if new_version != original_version.version + 1 {
return Err(MetaError::permission_denied("table version is stale"));
}
}
StreamingJob::Source(source) => {
let new_version = source.get_version();
let original_version: Option<i64> = SourceModel::find_by_id(id as SourceId)
.select_only()
.column(source::Column::Version)
.into_tuple()
.one(txn)
.await?
.ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
let original_version = original_version.expect("version for source should exist");
if new_version != original_version as u64 + 1 {
return Err(MetaError::permission_denied("source version is stale"));
}
}
StreamingJob::MaterializedView(_)
| StreamingJob::Sink(_, _)
| StreamingJob::Index(_, _) => {
bail_not_implemented!("schema change for {}", self.job_type_str())
}
}
Ok(())
}
}