risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_pb::catalog::PbTable;
17use risingwave_pb::catalog::subscription::PbSubscriptionState;
18use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
19use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
20
21use super::*;
22impl CatalogController {
23 pub async fn drop_object(
26 &self,
27 object_type: ObjectType,
28 object_id: impl Into<ObjectId>,
29 drop_mode: DropMode,
30 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
31 let object_id = object_id.into();
32 let mut inner = self.inner.write().await;
33 let txn = inner.db.begin().await?;
34
35 let obj: PartialObject = Object::find_by_id(object_id)
36 .into_partial_model()
37 .one(&txn)
38 .await?
39 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
40 assert_eq!(obj.obj_type, object_type);
41 let drop_database = object_type == ObjectType::Database;
42 let database_id = if object_type == ObjectType::Database {
43 object_id.as_database_id()
44 } else {
45 obj.database_id
46 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
47 };
48
49 if obj.obj_type == ObjectType::Subscription {
51 validate_subscription_deletion(&txn, object_id.as_subscription_id()).await?;
52 }
53
54 let mut removed_objects = match drop_mode {
55 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
56 DropMode::Restrict => match object_type {
57 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
58 ObjectType::Schema => {
59 ensure_schema_empty(object_id.as_schema_id(), &txn).await?;
60 Default::default()
61 }
62 ObjectType::Table => {
63 check_object_refer_for_drop(object_type, object_id, &txn).await?;
64 let objects = get_referring_objects(object_id, &txn).await?;
65 for obj in objects.iter().filter(|object| {
66 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
67 }) {
68 report_drop_object(obj.obj_type, obj.oid, &txn).await;
69 }
70 assert!(
71 objects.iter().all(|obj| obj.obj_type == ObjectType::Index
72 || obj.obj_type == ObjectType::Sink),
73 "only index and iceberg sink could be dropped in restrict mode"
74 );
75 for obj in &objects {
76 check_object_refer_for_drop(obj.obj_type, obj.oid, &txn).await?;
77 }
78 objects
79 }
80 object_type @ (ObjectType::Source | ObjectType::Sink) => {
81 check_object_refer_for_drop(object_type, object_id, &txn).await?;
82 report_drop_object(object_type, object_id, &txn).await;
83 vec![]
84 }
85
86 ObjectType::View
87 | ObjectType::Index
88 | ObjectType::Function
89 | ObjectType::Connection
90 | ObjectType::Subscription
91 | ObjectType::Secret => {
92 check_object_refer_for_drop(object_type, object_id, &txn).await?;
93 vec![]
94 }
95 },
96 };
97
98 if obj.obj_type == ObjectType::Table {
100 let table_name = Table::find_by_id(object_id.as_table_id())
101 .select_only()
102 .column(table::Column::Name)
103 .into_tuple::<String>()
104 .one(&txn)
105 .await?
106 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
107 let iceberg_source = Source::find()
108 .inner_join(Object)
109 .filter(
110 object::Column::DatabaseId
111 .eq(database_id)
112 .and(object::Column::SchemaId.eq(obj.schema_id.unwrap()))
113 .and(
114 source::Column::Name
115 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table_name)),
116 ),
117 )
118 .into_partial_model()
119 .one(&txn)
120 .await?;
121 if let Some(iceberg_source) = iceberg_source {
122 removed_objects.push(iceberg_source);
123 }
124 }
125
126 removed_objects.push(obj);
127 let mut removed_object_ids: HashSet<_> =
128 removed_objects.iter().map(|obj| obj.oid).collect();
129
130 let incoming_sink_ids: Vec<SinkId> = Sink::find()
133 .select_only()
134 .column(sink::Column::SinkId)
135 .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
136 .into_tuple()
137 .all(&txn)
138 .await?;
139 if !incoming_sink_ids.is_empty() {
140 if self.env.opts.protect_drop_table_with_incoming_sink {
141 let sink_names: Vec<String> = Sink::find()
142 .select_only()
143 .column(sink::Column::Name)
144 .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
145 .into_tuple()
146 .all(&txn)
147 .await?;
148
149 return Err(MetaError::permission_denied(format!(
150 "Table used by incoming sinks: {:?}, please drop them manually",
151 sink_names
152 )));
153 }
154
155 let removed_sink_objs: Vec<PartialObject> = Object::find()
156 .filter(object::Column::Oid.is_in(incoming_sink_ids))
157 .into_partial_model()
158 .all(&txn)
159 .await?;
160
161 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
162 removed_objects.extend(removed_sink_objs);
163 }
164
165 for obj in &removed_objects {
166 if obj.obj_type == ObjectType::Sink {
167 let sink = Sink::find_by_id(obj.oid.as_sink_id())
168 .one(&txn)
169 .await?
170 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
171
172 if let Some(target_table) = sink.target_table
173 && !removed_object_ids.contains(&target_table.as_object_id())
174 && !has_table_been_migrated(&txn, target_table).await?
175 {
176 return Err(anyhow::anyhow!(
177 "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
178 target_table
179 ).into());
180 }
181 }
182 }
183
184 if object_type != ObjectType::Table || drop_database {
187 for obj in &removed_objects {
188 if obj.obj_type == ObjectType::Table {
190 let table = Table::find_by_id(obj.oid.as_table_id())
191 .one(&txn)
192 .await?
193 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
194 if matches!(table.engine, Some(table::Engine::Iceberg)) {
195 return Err(MetaError::permission_denied(format!(
196 "Found iceberg table in dependency: {}, please drop it manually",
197 table.name,
198 )));
199 }
200 }
201 }
202 }
203
204 let removed_table_ids = removed_objects
205 .iter()
206 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
207 .map(|obj| obj.oid.as_table_id());
208
209 let removed_iceberg_table_sinks: Vec<PbSink> = Sink::find()
210 .find_also_related(Object)
211 .filter(
212 sink::Column::SinkId
213 .is_in(removed_object_ids.clone())
214 .and(sink::Column::Name.like(format!("{}%", ICEBERG_SINK_PREFIX))),
215 )
216 .all(&txn)
217 .await?
218 .into_iter()
219 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap(), None).into())
220 .collect();
221
222 let removed_iceberg_v3_sink_ids: Vec<SinkId> = Sink::find()
227 .filter(sink::Column::SinkId.is_in(removed_object_ids.clone()))
228 .all(&txn)
229 .await?
230 .into_iter()
231 .filter_map(|sink| {
232 crate::manager::iceberg_v3_sink::is_iceberg_v3_sink(sink.properties.inner_ref())
233 .then_some(sink.sink_id)
234 })
235 .collect();
236
237 let removed_streaming_job_ids: Vec<JobId> = StreamingJob::find()
238 .select_only()
239 .column(streaming_job::Column::JobId)
240 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
241 .into_tuple()
242 .all(&txn)
243 .await?;
244
245 if !removed_streaming_job_ids.is_empty() {
247 let creating = StreamingJob::find()
248 .filter(
249 streaming_job::Column::JobStatus
250 .ne(JobStatus::Created)
251 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
252 )
253 .count(&txn)
254 .await?;
255 if creating != 0 {
256 if creating == 1 && object_type == ObjectType::Sink {
257 info!("dropping creating sink job, it will be cancelled");
258 } else {
259 return Err(MetaError::permission_denied(format!(
260 "can not drop {creating} creating streaming job, please cancel them firstly"
261 )));
262 }
263 }
264 }
265
266 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
267
268 if !drop_database {
269 let removed_source_ids: Vec<SourceId> = Table::find()
271 .select_only()
272 .column(table::Column::OptionalAssociatedSourceId)
273 .filter(
274 table::Column::TableId
275 .is_in(removed_table_ids)
276 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
277 )
278 .into_tuple()
279 .all(&txn)
280 .await?;
281 let removed_source_objs: Vec<PartialObject> = Object::find()
282 .filter(object::Column::Oid.is_in(removed_source_ids))
283 .into_partial_model()
284 .all(&txn)
285 .await?;
286 removed_objects.extend(removed_source_objs);
287 }
288
289 let removed_source_ids: HashSet<_> = removed_objects
290 .iter()
291 .filter(|obj| obj.obj_type == ObjectType::Source)
292 .map(|obj| obj.oid.as_source_id())
293 .collect();
294
295 let removed_secret_ids = removed_objects
296 .iter()
297 .filter(|obj| obj.obj_type == ObjectType::Secret)
298 .map(|obj| obj.oid.as_secret_id())
299 .collect();
300
301 if !removed_streaming_job_ids.is_empty() {
302 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
303 .select_only()
304 .columns([
305 object::Column::Oid,
306 object::Column::ObjType,
307 object::Column::SchemaId,
308 object::Column::DatabaseId,
309 ])
310 .join(JoinType::InnerJoin, object::Relation::Table.def())
311 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
312 .into_partial_model()
313 .all(&txn)
314 .await?;
315
316 removed_state_table_ids.extend(
317 removed_internal_table_objs
318 .iter()
319 .map(|obj| obj.oid.as_table_id()),
320 );
321 removed_objects.extend(removed_internal_table_objs);
322 }
323
324 let removed_objects: HashMap<_, _> = removed_objects
325 .into_iter()
326 .map(|obj| (obj.oid, obj))
327 .collect();
328
329 for obj in removed_objects.values() {
331 if let Some(obj_database_id) = obj.database_id
332 && obj_database_id != database_id
333 {
334 return Err(MetaError::permission_denied(format!(
335 "Referenced by other objects in database {obj_database_id}, please drop them manually"
336 )));
337 }
338 }
339
340 let (removed_source_fragments, removed_sink_fragments, removed_fragments) =
341 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
342
343 let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
344 let mut removed_sink_fragment_by_targets = HashMap::new();
345 for (sink_fragment, target_fragments) in sink_target_fragments {
346 assert!(
347 target_fragments.len() <= 1,
348 "sink should have at most one downstream fragment"
349 );
350 if let Some(target_fragment) = target_fragments.first()
351 && !removed_fragments.contains(target_fragment)
352 {
353 removed_sink_fragment_by_targets
354 .entry(*target_fragment)
355 .or_insert_with(Vec::new)
356 .push(sink_fragment);
357 }
358 }
359
360 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
362 .select_only()
363 .distinct()
364 .column(user_privilege::Column::UserId)
365 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
366 .into_tuple()
367 .all(&txn)
368 .await?;
369 let dropped_tables = Table::find()
370 .find_also_related(Object)
371 .filter(
372 table::Column::TableId.is_in(
373 removed_state_table_ids
374 .iter()
375 .copied()
376 .collect::<HashSet<TableId>>(),
377 ),
378 )
379 .all(&txn)
380 .await?
381 .into_iter()
382 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
383 let res = Object::delete_many()
385 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
386 .exec(&txn)
387 .await?;
388 if res.rows_affected == 0 {
389 return Err(MetaError::catalog_id_not_found(
390 object_type.as_str(),
391 object_id,
392 ));
393 }
394 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
395
396 txn.commit().await?;
397
398 self.notify_users_update(user_infos).await;
400 inner
401 .dropped_tables
402 .extend(dropped_tables.map(|t| (t.id, t)));
403
404 let version = match object_type {
405 ObjectType::Database => {
406 self.notify_frontend(
408 NotificationOperation::Delete,
409 NotificationInfo::Database(PbDatabase {
410 id: database_id,
411 ..Default::default()
412 }),
413 )
414 .await
415 }
416 ObjectType::Schema => {
417 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
418 .into_values()
419 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
420 let schema_obj = Itertools::exactly_one(schema_obj.into_iter())
421 .expect("schema object not found");
422 to_notify_objs.push(schema_obj);
423
424 let relation_group = build_object_group_for_delete(to_notify_objs);
425 self.notify_frontend(NotificationOperation::Delete, relation_group)
426 .await
427 }
428 _ => {
429 let relation_group =
432 build_object_group_for_delete(removed_objects.into_values().collect());
433 self.notify_frontend(NotificationOperation::Delete, relation_group)
434 .await
435 }
436 };
437
438 Ok((
439 ReleaseContext {
440 database_id,
441 removed_streaming_job_ids,
442 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
443 removed_source_ids: removed_source_ids.into_iter().collect(),
444 removed_secret_ids,
445 removed_source_fragments,
446 removed_fragments,
447 removed_sink_fragment_by_targets,
448 removed_iceberg_table_sinks,
449 removed_iceberg_v3_sink_ids,
450 },
451 version,
452 ))
453 }
454
455 pub async fn try_abort_creating_subscription(
456 &self,
457 subscription_id: SubscriptionId,
458 ) -> MetaResult<()> {
459 let inner = self.inner.write().await;
460 let txn = inner.db.begin().await?;
461
462 let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
463 let Some(subscription) = subscription else {
464 tracing::warn!(
465 %subscription_id,
466 "subscription not found when aborting creation, might be cleaned by recovery"
467 );
468 return Ok(());
469 };
470
471 if subscription.subscription_state == PbSubscriptionState::Created as i32 {
472 tracing::warn!(
473 %subscription_id,
474 "subscription is already created when aborting creation"
475 );
476 return Ok(());
477 }
478
479 subscription.delete(&txn).await?;
480 txn.commit().await?;
481 Ok(())
482 }
483}
484
485async fn report_drop_object(
486 object_type: ObjectType,
487 object_id: ObjectId,
488 txn: &DatabaseTransaction,
489) {
490 let connector_name = {
491 match object_type {
492 ObjectType::Sink => Sink::find_by_id(object_id.as_sink_id())
493 .select_only()
494 .column(sink::Column::Properties)
495 .into_tuple::<Property>()
496 .one(txn)
497 .await
498 .ok()
499 .flatten()
500 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
501 ObjectType::Source => Source::find_by_id(object_id.as_source_id())
502 .select_only()
503 .column(source::Column::WithProperties)
504 .into_tuple::<Property>()
505 .one(txn)
506 .await
507 .ok()
508 .flatten()
509 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
510 _ => unreachable!(),
511 }
512 };
513 if let Some(connector_name) = connector_name {
514 report_event(
515 PbTelemetryEventStage::DropStreamJob,
516 "source",
517 object_id.as_raw_id() as _,
518 Some(connector_name),
519 Some(match object_type {
520 ObjectType::Source => PbTelemetryDatabaseObject::Source,
521 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
522 _ => unreachable!(),
523 }),
524 None,
525 );
526 }
527}