risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_pb::catalog::PbTable;
17use risingwave_pb::catalog::subscription::PbSubscriptionState;
18use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
19use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
20
21use super::*;
22impl CatalogController {
23 pub async fn drop_object(
26 &self,
27 object_type: ObjectType,
28 object_id: impl Into<ObjectId>,
29 drop_mode: DropMode,
30 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
31 let object_id = object_id.into();
32 let mut inner = self.inner.write().await;
33 let txn = inner.db.begin().await?;
34
35 let obj: PartialObject = Object::find_by_id(object_id)
36 .into_partial_model()
37 .one(&txn)
38 .await?
39 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
40 assert_eq!(obj.obj_type, object_type);
41 let drop_database = object_type == ObjectType::Database;
42 let database_id = if object_type == ObjectType::Database {
43 object_id.as_database_id()
44 } else {
45 obj.database_id
46 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
47 };
48
49 if obj.obj_type == ObjectType::Subscription {
51 validate_subscription_deletion(&txn, object_id.as_subscription_id()).await?;
52 }
53
54 let mut removed_objects = match drop_mode {
55 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
56 DropMode::Restrict => match object_type {
57 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
58 ObjectType::Schema => {
59 ensure_schema_empty(object_id.as_schema_id(), &txn).await?;
60 Default::default()
61 }
62 ObjectType::Table => {
63 check_object_refer_for_drop(object_type, object_id, &txn).await?;
64 let objects = get_referring_objects(object_id, &txn).await?;
65 for obj in objects.iter().filter(|object| {
66 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
67 }) {
68 report_drop_object(obj.obj_type, obj.oid, &txn).await;
69 }
70 assert!(
71 objects.iter().all(|obj| obj.obj_type == ObjectType::Index
72 || obj.obj_type == ObjectType::Sink),
73 "only index and iceberg sink could be dropped in restrict mode"
74 );
75 for obj in &objects {
76 check_object_refer_for_drop(obj.obj_type, obj.oid, &txn).await?;
77 }
78 objects
79 }
80 object_type @ (ObjectType::Source | ObjectType::Sink) => {
81 check_object_refer_for_drop(object_type, object_id, &txn).await?;
82 report_drop_object(object_type, object_id, &txn).await;
83 vec![]
84 }
85
86 ObjectType::View
87 | ObjectType::Index
88 | ObjectType::Function
89 | ObjectType::Connection
90 | ObjectType::Subscription
91 | ObjectType::Secret => {
92 check_object_refer_for_drop(object_type, object_id, &txn).await?;
93 vec![]
94 }
95 },
96 };
97
98 if obj.obj_type == ObjectType::Table {
100 let table_name = Table::find_by_id(object_id.as_table_id())
101 .select_only()
102 .column(table::Column::Name)
103 .into_tuple::<String>()
104 .one(&txn)
105 .await?
106 .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
107 let iceberg_source = Source::find()
108 .inner_join(Object)
109 .filter(
110 object::Column::DatabaseId
111 .eq(database_id)
112 .and(object::Column::SchemaId.eq(obj.schema_id.unwrap()))
113 .and(
114 source::Column::Name
115 .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table_name)),
116 ),
117 )
118 .into_partial_model()
119 .one(&txn)
120 .await?;
121 if let Some(iceberg_source) = iceberg_source {
122 removed_objects.push(iceberg_source);
123 }
124 }
125
126 removed_objects.push(obj);
127 let mut removed_object_ids: HashSet<_> =
128 removed_objects.iter().map(|obj| obj.oid).collect();
129
130 let incoming_sink_ids: Vec<SinkId> = Sink::find()
133 .select_only()
134 .column(sink::Column::SinkId)
135 .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
136 .into_tuple()
137 .all(&txn)
138 .await?;
139 if !incoming_sink_ids.is_empty() {
140 if self.env.opts.protect_drop_table_with_incoming_sink {
141 let sink_names: Vec<String> = Sink::find()
142 .select_only()
143 .column(sink::Column::Name)
144 .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
145 .into_tuple()
146 .all(&txn)
147 .await?;
148
149 return Err(MetaError::permission_denied(format!(
150 "Table used by incoming sinks: {:?}, please drop them manually",
151 sink_names
152 )));
153 }
154
155 let removed_sink_objs: Vec<PartialObject> = Object::find()
156 .filter(object::Column::Oid.is_in(incoming_sink_ids))
157 .into_partial_model()
158 .all(&txn)
159 .await?;
160
161 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
162 removed_objects.extend(removed_sink_objs);
163 }
164
165 for obj in &removed_objects {
166 if obj.obj_type == ObjectType::Sink {
167 let sink = Sink::find_by_id(obj.oid.as_sink_id())
168 .one(&txn)
169 .await?
170 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
171
172 if let Some(target_table) = sink.target_table
173 && !removed_object_ids.contains(&target_table.as_object_id())
174 && !has_table_been_migrated(&txn, target_table).await?
175 {
176 return Err(anyhow::anyhow!(
177 "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
178 target_table
179 ).into());
180 }
181 }
182 }
183
184 if object_type != ObjectType::Table || drop_database {
187 for obj in &removed_objects {
188 if obj.obj_type == ObjectType::Table {
190 let table = Table::find_by_id(obj.oid.as_table_id())
191 .one(&txn)
192 .await?
193 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
194 if matches!(table.engine, Some(table::Engine::Iceberg)) {
195 return Err(MetaError::permission_denied(format!(
196 "Found iceberg table in dependency: {}, please drop it manually",
197 table.name,
198 )));
199 }
200 }
201 }
202 }
203
204 let removed_table_ids = removed_objects
205 .iter()
206 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
207 .map(|obj| obj.oid.as_table_id());
208
209 let removed_iceberg_table_sinks: Vec<PbSink> = Sink::find()
210 .find_also_related(Object)
211 .filter(
212 sink::Column::SinkId
213 .is_in(removed_object_ids.clone())
214 .and(sink::Column::Name.like(format!("{}%", ICEBERG_SINK_PREFIX))),
215 )
216 .all(&txn)
217 .await?
218 .into_iter()
219 .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
220 .collect();
221
222 let removed_streaming_job_ids: Vec<JobId> = StreamingJob::find()
223 .select_only()
224 .column(streaming_job::Column::JobId)
225 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
226 .into_tuple()
227 .all(&txn)
228 .await?;
229
230 if !removed_streaming_job_ids.is_empty() {
232 let creating = StreamingJob::find()
233 .filter(
234 streaming_job::Column::JobStatus
235 .ne(JobStatus::Created)
236 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
237 )
238 .count(&txn)
239 .await?;
240 if creating != 0 {
241 if creating == 1 && object_type == ObjectType::Sink {
242 info!("dropping creating sink job, it will be cancelled");
243 } else {
244 return Err(MetaError::permission_denied(format!(
245 "can not drop {creating} creating streaming job, please cancel them firstly"
246 )));
247 }
248 }
249 }
250
251 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
252
253 if !drop_database {
254 let removed_source_ids: Vec<SourceId> = Table::find()
256 .select_only()
257 .column(table::Column::OptionalAssociatedSourceId)
258 .filter(
259 table::Column::TableId
260 .is_in(removed_table_ids)
261 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
262 )
263 .into_tuple()
264 .all(&txn)
265 .await?;
266 let removed_source_objs: Vec<PartialObject> = Object::find()
267 .filter(object::Column::Oid.is_in(removed_source_ids))
268 .into_partial_model()
269 .all(&txn)
270 .await?;
271 removed_objects.extend(removed_source_objs);
272 }
273
274 let removed_source_ids: HashSet<_> = removed_objects
275 .iter()
276 .filter(|obj| obj.obj_type == ObjectType::Source)
277 .map(|obj| obj.oid.as_source_id())
278 .collect();
279
280 let removed_secret_ids = removed_objects
281 .iter()
282 .filter(|obj| obj.obj_type == ObjectType::Secret)
283 .map(|obj| obj.oid.as_secret_id())
284 .collect_vec();
285
286 if !removed_streaming_job_ids.is_empty() {
287 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
288 .select_only()
289 .columns([
290 object::Column::Oid,
291 object::Column::ObjType,
292 object::Column::SchemaId,
293 object::Column::DatabaseId,
294 ])
295 .join(JoinType::InnerJoin, object::Relation::Table.def())
296 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
297 .into_partial_model()
298 .all(&txn)
299 .await?;
300
301 removed_state_table_ids.extend(
302 removed_internal_table_objs
303 .iter()
304 .map(|obj| obj.oid.as_table_id()),
305 );
306 removed_objects.extend(removed_internal_table_objs);
307 }
308
309 let removed_objects: HashMap<_, _> = removed_objects
310 .into_iter()
311 .map(|obj| (obj.oid, obj))
312 .collect();
313
314 for obj in removed_objects.values() {
316 if let Some(obj_database_id) = obj.database_id
317 && obj_database_id != database_id
318 {
319 return Err(MetaError::permission_denied(format!(
320 "Referenced by other objects in database {obj_database_id}, please drop them manually"
321 )));
322 }
323 }
324
325 let (removed_source_fragments, removed_sink_fragments, removed_actors, removed_fragments) =
326 get_fragments_for_jobs(
327 &txn,
328 self.env.shared_actor_infos(),
329 removed_streaming_job_ids.clone(),
330 )
331 .await?;
332
333 let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
334 let mut removed_sink_fragment_by_targets = HashMap::new();
335 for (sink_fragment, target_fragments) in sink_target_fragments {
336 assert!(
337 target_fragments.len() <= 1,
338 "sink should have at most one downstream fragment"
339 );
340 if let Some(target_fragment) = target_fragments.first()
341 && !removed_fragments.contains(target_fragment)
342 {
343 removed_sink_fragment_by_targets
344 .entry(*target_fragment)
345 .or_insert_with(Vec::new)
346 .push(sink_fragment);
347 }
348 }
349
350 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
352 .select_only()
353 .distinct()
354 .column(user_privilege::Column::UserId)
355 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
356 .into_tuple()
357 .all(&txn)
358 .await?;
359 let dropped_tables = Table::find()
360 .find_also_related(Object)
361 .filter(
362 table::Column::TableId.is_in(
363 removed_state_table_ids
364 .iter()
365 .copied()
366 .collect::<HashSet<TableId>>(),
367 ),
368 )
369 .all(&txn)
370 .await?
371 .into_iter()
372 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
373 let res = Object::delete_many()
375 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
376 .exec(&txn)
377 .await?;
378 if res.rows_affected == 0 {
379 return Err(MetaError::catalog_id_not_found(
380 object_type.as_str(),
381 object_id,
382 ));
383 }
384 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
385
386 txn.commit().await?;
387
388 self.notify_users_update(user_infos).await;
390 inner
391 .dropped_tables
392 .extend(dropped_tables.map(|t| (t.id, t)));
393
394 let version = match object_type {
395 ObjectType::Database => {
396 self.notify_frontend(
398 NotificationOperation::Delete,
399 NotificationInfo::Database(PbDatabase {
400 id: database_id,
401 ..Default::default()
402 }),
403 )
404 .await
405 }
406 ObjectType::Schema => {
407 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
408 .into_values()
409 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
410 let schema_obj = schema_obj
411 .into_iter()
412 .exactly_one()
413 .expect("schema object not found");
414 to_notify_objs.push(schema_obj);
415
416 let relation_group = build_object_group_for_delete(to_notify_objs);
417 self.notify_frontend(NotificationOperation::Delete, relation_group)
418 .await
419 }
420 _ => {
421 let relation_group =
424 build_object_group_for_delete(removed_objects.into_values().collect());
425 self.notify_frontend(NotificationOperation::Delete, relation_group)
426 .await
427 }
428 };
429
430 Ok((
431 ReleaseContext {
432 database_id,
433 removed_streaming_job_ids,
434 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
435 removed_source_ids: removed_source_ids.into_iter().collect(),
436 removed_secret_ids,
437 removed_source_fragments,
438 removed_actors,
439 removed_fragments,
440 removed_sink_fragment_by_targets,
441 removed_iceberg_table_sinks,
442 },
443 version,
444 ))
445 }
446
447 pub async fn try_abort_creating_subscription(
448 &self,
449 subscription_id: SubscriptionId,
450 ) -> MetaResult<()> {
451 let inner = self.inner.write().await;
452 let txn = inner.db.begin().await?;
453
454 let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
455 let Some(subscription) = subscription else {
456 tracing::warn!(
457 %subscription_id,
458 "subscription not found when aborting creation, might be cleaned by recovery"
459 );
460 return Ok(());
461 };
462
463 if subscription.subscription_state == PbSubscriptionState::Created as i32 {
464 tracing::warn!(
465 %subscription_id,
466 "subscription is already created when aborting creation"
467 );
468 return Ok(());
469 }
470
471 subscription.delete(&txn).await?;
472 Ok(())
473 }
474}
475
476async fn report_drop_object(
477 object_type: ObjectType,
478 object_id: ObjectId,
479 txn: &DatabaseTransaction,
480) {
481 let connector_name = {
482 match object_type {
483 ObjectType::Sink => Sink::find_by_id(object_id.as_sink_id())
484 .select_only()
485 .column(sink::Column::Properties)
486 .into_tuple::<Property>()
487 .one(txn)
488 .await
489 .ok()
490 .flatten()
491 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
492 ObjectType::Source => Source::find_by_id(object_id.as_source_id())
493 .select_only()
494 .column(source::Column::WithProperties)
495 .into_tuple::<Property>()
496 .one(txn)
497 .await
498 .ok()
499 .flatten()
500 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
501 _ => unreachable!(),
502 }
503 };
504 if let Some(connector_name) = connector_name {
505 report_event(
506 PbTelemetryEventStage::DropStreamJob,
507 "source",
508 object_id.as_raw_id() as _,
509 Some(connector_name),
510 Some(match object_type {
511 ObjectType::Source => PbTelemetryDatabaseObject::Source,
512 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
513 _ => unreachable!(),
514 }),
515 None,
516 );
517 }
518}