1use std::collections::HashSet;
16
17use risingwave_common::bail_not_implemented;
18use risingwave_common::catalog::TableVersionId;
19use risingwave_common::id::JobId;
20use risingwave_meta_model::object::ObjectType;
21use risingwave_meta_model::prelude::{SourceModel, TableModel};
22use risingwave_meta_model::{SourceId, TableVersion, source, table};
23use risingwave_pb::catalog::{CreateType, Index, PbSource, Sink, Table};
24use risingwave_pb::ddl_service::TableJobType;
25use sea_orm::entity::prelude::*;
26use sea_orm::{DatabaseTransaction, QuerySelect};
27use strum::{EnumIs, EnumTryAs};
28
29use super::{
30 get_referred_connection_ids_from_sink, get_referred_connection_ids_from_source,
31 get_referred_secret_ids_from_sink, get_referred_secret_ids_from_source,
32};
33use crate::stream::StreamFragmentGraph;
34use crate::{MetaError, MetaResult};
35
36#[derive(Debug, Clone, EnumIs, EnumTryAs)]
39pub enum StreamingJob {
40 MaterializedView(Table),
41 Sink(Sink),
42 Table(Option<PbSource>, Table, TableJobType),
43 Index(Index, Table),
44 Source(PbSource),
45}
46
47impl std::fmt::Display for StreamingJob {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 StreamingJob::MaterializedView(table) => {
51 write!(f, "MaterializedView: {}({})", table.name, table.id)
52 }
53 StreamingJob::Sink(sink) => write!(f, "Sink: {}({})", sink.name, sink.id),
54 StreamingJob::Table(_, table, _) => write!(f, "Table: {}({})", table.name, table.id),
55 StreamingJob::Index(index, _) => write!(f, "Index: {}({})", index.name, index.id),
56 StreamingJob::Source(source) => write!(f, "Source: {}({})", source.name, source.id),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub enum StreamingJobType {
63 MaterializedView,
64 Sink,
65 Table(TableJobType),
66 Index,
67 Source,
68}
69
70impl From<&StreamingJob> for StreamingJobType {
71 fn from(job: &StreamingJob) -> Self {
72 match job {
73 StreamingJob::MaterializedView(_) => StreamingJobType::MaterializedView,
74 StreamingJob::Sink(_) => StreamingJobType::Sink,
75 StreamingJob::Table(_, _, ty) => StreamingJobType::Table(*ty),
76 StreamingJob::Index(_, _) => StreamingJobType::Index,
77 StreamingJob::Source(_) => StreamingJobType::Source,
78 }
79 }
80}
81
82#[cfg(test)]
83#[allow(clippy::derivable_impls)]
84impl Default for StreamingJobType {
85 fn default() -> Self {
86 StreamingJobType::MaterializedView
89 }
90}
91
92impl StreamingJob {
97 pub fn set_table_vnode_count(&mut self, vnode_count: usize) {
99 match self {
100 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
101 table.maybe_vnode_count = Some(vnode_count as u32);
102 }
103 Self::Sink(_) | Self::Source(_) => {}
104 }
105 }
106
107 pub fn set_info_from_graph(&mut self, graph: &StreamFragmentGraph) {
109 match self {
110 Self::Table(_, table, ..) => {
111 table.fragment_id = graph.table_fragment_id();
112 table.dml_fragment_id = graph.dml_fragment_id();
113 }
114 Self::MaterializedView(table) | Self::Index(_, table) => {
115 table.fragment_id = graph.table_fragment_id();
116 }
117 Self::Sink(_) | Self::Source(_) => {}
118 }
119 }
120
121 pub fn id(&self) -> JobId {
122 match self {
123 Self::MaterializedView(table) => table.id,
124 Self::Sink(sink) => sink.id,
125 Self::Table(_, table, ..) => table.id,
126 Self::Index(index, _) => index.id,
127 Self::Source(source) => source.id,
128 }
129 .into()
130 }
131
132 pub fn mv_table(&self) -> Option<u32> {
133 match self {
134 Self::MaterializedView(table) => Some(table.id),
135 Self::Sink(_) => None,
136 Self::Table(_, table, ..) => Some(table.id),
137 Self::Index(_, table) => Some(table.id),
138 Self::Source(_) => None,
139 }
140 }
141
142 pub fn table(&self) -> Option<&Table> {
144 match self {
145 Self::MaterializedView(table) | Self::Index(_, table) | Self::Table(_, table, ..) => {
146 Some(table)
147 }
148 Self::Sink(_) | Self::Source(_) => None,
149 }
150 }
151
152 pub fn schema_id(&self) -> u32 {
153 match self {
154 Self::MaterializedView(table) => table.schema_id,
155 Self::Sink(sink) => sink.schema_id,
156 Self::Table(_, table, ..) => table.schema_id,
157 Self::Index(index, _) => index.schema_id,
158 Self::Source(source) => source.schema_id,
159 }
160 }
161
162 pub fn database_id(&self) -> u32 {
163 match self {
164 Self::MaterializedView(table) => table.database_id,
165 Self::Sink(sink) => sink.database_id,
166 Self::Table(_, table, ..) => table.database_id,
167 Self::Index(index, _) => index.database_id,
168 Self::Source(source) => source.database_id,
169 }
170 }
171
172 pub fn name(&self) -> String {
173 match self {
174 Self::MaterializedView(table) => table.name.clone(),
175 Self::Sink(sink) => sink.name.clone(),
176 Self::Table(_, table, ..) => table.name.clone(),
177 Self::Index(index, _) => index.name.clone(),
178 Self::Source(source) => source.name.clone(),
179 }
180 }
181
182 pub fn owner(&self) -> u32 {
183 match self {
184 StreamingJob::MaterializedView(mv) => mv.owner,
185 StreamingJob::Sink(sink) => sink.owner,
186 StreamingJob::Table(_, table, ..) => table.owner,
187 StreamingJob::Index(index, _) => index.owner,
188 StreamingJob::Source(source) => source.owner,
189 }
190 }
191
192 pub fn job_type(&self) -> StreamingJobType {
193 self.into()
194 }
195
196 pub fn job_type_str(&self) -> &'static str {
197 match self {
198 StreamingJob::MaterializedView(_) => "materialized view",
199 StreamingJob::Sink(_) => "sink",
200 StreamingJob::Table(_, _, _) => "table",
201 StreamingJob::Index(_, _) => "index",
202 StreamingJob::Source(_) => "source",
203 }
204 }
205
206 pub fn definition(&self) -> String {
207 match self {
208 Self::MaterializedView(table) => table.definition.clone(),
209 Self::Table(_, table, ..) => table.definition.clone(),
210 Self::Index(_, table) => table.definition.clone(),
211 Self::Sink(sink) => sink.definition.clone(),
212 Self::Source(source) => source.definition.clone(),
213 }
214 }
215
216 pub fn object_type(&self) -> ObjectType {
217 match self {
218 Self::MaterializedView(_) => ObjectType::Table, Self::Sink(_) => ObjectType::Sink,
220 Self::Table(_, _, _) => ObjectType::Table,
221 Self::Index(_, _) => ObjectType::Index,
222 Self::Source(_) => ObjectType::Source,
223 }
224 }
225
226 pub fn table_version_id(&self) -> Option<TableVersionId> {
228 if let Self::Table(_, table, ..) = self {
229 Some(
230 table
231 .get_version()
232 .expect("table must be versioned")
233 .version,
234 )
235 } else {
236 None
237 }
238 }
239
240 pub fn create_type(&self) -> CreateType {
241 match self {
242 Self::MaterializedView(table) => {
243 table.get_create_type().unwrap_or(CreateType::Foreground)
244 }
245 Self::Sink(s) => s.get_create_type().unwrap_or(CreateType::Foreground),
246 Self::Index(index, _) => {
247 CreateType::try_from(index.create_type).unwrap_or(CreateType::Foreground)
248 }
249 _ => CreateType::Foreground,
250 }
251 }
252
253 pub fn dependent_connection_ids(&self) -> MetaResult<HashSet<u32>> {
254 match self {
255 StreamingJob::Source(source) => Ok(get_referred_connection_ids_from_source(source)),
256 StreamingJob::Table(source, _, _) => {
257 if let Some(source) = source {
258 Ok(get_referred_connection_ids_from_source(source))
259 } else {
260 Ok(HashSet::new())
261 }
262 }
263 StreamingJob::Sink(sink) => Ok(get_referred_connection_ids_from_sink(sink)),
264 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
265 }
266 }
267
268 pub fn dependent_secret_ids(&self) -> MetaResult<HashSet<u32>> {
270 match self {
271 StreamingJob::Sink(sink) => Ok(get_referred_secret_ids_from_sink(sink)),
272 StreamingJob::Table(source, _, _) => {
273 if let Some(source) = source {
274 get_referred_secret_ids_from_source(source)
275 } else {
276 Ok(HashSet::new())
277 }
278 }
279 StreamingJob::Source(source) => get_referred_secret_ids_from_source(source),
280 StreamingJob::MaterializedView(_) | StreamingJob::Index(_, _) => Ok(HashSet::new()),
281 }
282 }
283
284 pub async fn verify_version_for_replace(&self, txn: &DatabaseTransaction) -> MetaResult<()> {
286 let id = self.id();
287
288 match self {
289 StreamingJob::Table(_source, table, _table_job_type) => {
290 let new_version = table.get_version()?.get_version();
291 let original_version: Option<TableVersion> =
292 TableModel::find_by_id(id.as_mv_table_id())
293 .select_only()
294 .column(table::Column::Version)
295 .into_tuple()
296 .one(txn)
297 .await?
298 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
299 let original_version = original_version
300 .expect("version for table should exist")
301 .to_protobuf();
302 if new_version != original_version.version + 1 {
303 return Err(MetaError::permission_denied("table version is stale"));
304 }
305 }
306 StreamingJob::Source(source) => {
307 let new_version = source.get_version();
308 let original_version: Option<i64> =
309 SourceModel::find_by_id(id.as_raw_id() as SourceId)
310 .select_only()
311 .column(source::Column::Version)
312 .into_tuple()
313 .one(txn)
314 .await?
315 .ok_or_else(|| MetaError::catalog_id_not_found(self.job_type_str(), id))?;
316 let original_version = original_version.expect("version for source should exist");
317 if new_version != original_version as u64 + 1 {
318 return Err(MetaError::permission_denied("source version is stale"));
319 }
320 }
321 StreamingJob::MaterializedView(_) => {
322 }
325 StreamingJob::Sink(_) => {
326 }
328 StreamingJob::Index(_, _) => {
329 bail_not_implemented!("schema change for {}", self.job_type_str())
330 }
331 }
332 Ok(())
333 }
334
335 pub fn should_notify_creating(&self) -> bool {
337 self.is_materialized_view() || matches!(self.create_type(), CreateType::Background)
338 }
339
340 pub fn is_sink_into_table(&self) -> bool {
341 matches!(self, Self::Sink(sink) if sink.target_table.is_some())
342 }
343}