risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22 pub async fn drop_object(
25 &self,
26 object_type: ObjectType,
27 object_id: ObjectId,
28 drop_mode: DropMode,
29 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30 let mut inner = self.inner.write().await;
31 let txn = inner.db.begin().await?;
32
33 let obj: PartialObject = Object::find_by_id(object_id)
34 .into_partial_model()
35 .one(&txn)
36 .await?
37 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38 assert_eq!(obj.obj_type, object_type);
39 let drop_database = object_type == ObjectType::Database;
40 let database_id = if object_type == ObjectType::Database {
41 object_id
42 } else {
43 obj.database_id
44 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45 };
46
47 if obj.obj_type == ObjectType::Subscription {
49 validate_subscription_deletion(&txn, object_id).await?;
50 }
51
52 let mut removed_objects = match drop_mode {
53 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54 DropMode::Restrict => match object_type {
55 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56 ObjectType::Schema => {
57 ensure_schema_empty(object_id, &txn).await?;
58 Default::default()
59 }
60 ObjectType::Table => {
61 check_object_refer_for_drop(object_type, object_id, &txn).await?;
62 let indexes = get_referring_objects(object_id, &txn).await?;
63 for obj in indexes.iter().filter(|object| {
64 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65 }) {
66 report_drop_object(obj.obj_type, obj.oid, &txn).await;
67 }
68 assert!(
69 indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70 "only index could be dropped in restrict mode"
71 );
72 indexes
73 }
74 object_type @ (ObjectType::Source | ObjectType::Sink) => {
75 check_object_refer_for_drop(object_type, object_id, &txn).await?;
76 report_drop_object(object_type, object_id, &txn).await;
77 vec![]
78 }
79
80 ObjectType::View
81 | ObjectType::Index
82 | ObjectType::Function
83 | ObjectType::Connection
84 | ObjectType::Subscription
85 | ObjectType::Secret => {
86 check_object_refer_for_drop(object_type, object_id, &txn).await?;
87 vec![]
88 }
89 },
90 };
91 removed_objects.push(obj);
92 let mut removed_object_ids: HashSet<_> =
93 removed_objects.iter().map(|obj| obj.oid).collect();
94
95 let removed_incoming_sinks: Vec<I32Array> = Table::find()
98 .select_only()
99 .column(table::Column::IncomingSinks)
100 .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
101 .into_tuple()
102 .all(&txn)
103 .await?;
104 if !removed_incoming_sinks.is_empty() {
105 let incoming_sink_ids = removed_incoming_sinks
106 .into_iter()
107 .flat_map(|arr| arr.into_inner().into_iter())
108 .collect_vec();
109
110 if self.env.opts.protect_drop_table_with_incoming_sink {
111 let sink_names: Vec<String> = Sink::find()
112 .select_only()
113 .column(sink::Column::Name)
114 .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
115 .into_tuple()
116 .all(&txn)
117 .await?;
118
119 return Err(MetaError::permission_denied(format!(
120 "Table used by incoming sinks: {:?}, please drop them manually",
121 sink_names
122 )));
123 }
124
125 let removed_sink_objs: Vec<PartialObject> = Object::find()
126 .filter(object::Column::Oid.is_in(incoming_sink_ids))
127 .into_partial_model()
128 .all(&txn)
129 .await?;
130
131 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
132 removed_objects.extend(removed_sink_objs);
133 }
134
135 if object_type != ObjectType::Sink {
137 for obj in &removed_objects {
138 if obj.obj_type == ObjectType::Sink {
139 let sink = Sink::find_by_id(obj.oid)
140 .one(&txn)
141 .await?
142 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
143
144 if let Some(target_table) = sink.target_table
146 && !removed_object_ids.contains(&target_table)
147 {
148 return Err(MetaError::permission_denied(format!(
149 "Found sink into table in dependency: {}, please drop it manually",
150 sink.name,
151 )));
152 }
153 }
154 }
155 }
156
157 if object_type != ObjectType::Table || drop_database {
160 for obj in &removed_objects {
161 if obj.obj_type == ObjectType::Table {
163 let table = Table::find_by_id(obj.oid)
164 .one(&txn)
165 .await?
166 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
167 if matches!(table.engine, Some(table::Engine::Iceberg)) {
168 return Err(MetaError::permission_denied(format!(
169 "Found iceberg table in dependency: {}, please drop it manually",
170 table.name,
171 )));
172 }
173 }
174 }
175 }
176
177 let removed_table_ids = removed_objects
178 .iter()
179 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
180 .map(|obj| obj.oid);
181
182 let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
183 .select_only()
184 .column(streaming_job::Column::JobId)
185 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
186 .into_tuple()
187 .all(&txn)
188 .await?;
189
190 if !removed_streaming_job_ids.is_empty() {
192 let creating = StreamingJob::find()
193 .filter(
194 streaming_job::Column::JobStatus
195 .ne(JobStatus::Created)
196 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
197 )
198 .count(&txn)
199 .await?;
200 if creating != 0 {
201 return Err(MetaError::permission_denied(format!(
202 "can not drop {creating} creating streaming job, please cancel them firstly"
203 )));
204 }
205 }
206
207 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
208
209 let mut removed_source_ids: Vec<SourceId> = Table::find()
211 .select_only()
212 .column(table::Column::OptionalAssociatedSourceId)
213 .filter(
214 table::Column::TableId
215 .is_in(removed_table_ids)
216 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
217 )
218 .into_tuple()
219 .all(&txn)
220 .await?;
221 let removed_source_objs: Vec<PartialObject> = Object::find()
222 .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
223 .into_partial_model()
224 .all(&txn)
225 .await?;
226 removed_objects.extend(removed_source_objs);
227 if object_type == ObjectType::Source {
228 removed_source_ids.push(object_id);
229 }
230
231 let removed_secret_ids = removed_objects
232 .iter()
233 .filter(|obj| obj.obj_type == ObjectType::Secret)
234 .map(|obj| obj.oid)
235 .collect_vec();
236
237 if !removed_streaming_job_ids.is_empty() {
238 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
239 .select_only()
240 .columns([
241 object::Column::Oid,
242 object::Column::ObjType,
243 object::Column::SchemaId,
244 object::Column::DatabaseId,
245 ])
246 .join(JoinType::InnerJoin, object::Relation::Table.def())
247 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
248 .into_partial_model()
249 .all(&txn)
250 .await?;
251
252 removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
253 removed_objects.extend(removed_internal_table_objs);
254 }
255
256 let removed_objects: HashMap<_, _> = removed_objects
257 .into_iter()
258 .map(|obj| (obj.oid, obj))
259 .collect();
260
261 for obj in removed_objects.values() {
263 if let Some(obj_database_id) = obj.database_id
264 && obj_database_id != database_id
265 {
266 return Err(MetaError::permission_denied(format!(
267 "Referenced by other objects in database {obj_database_id}, please drop them manually"
268 )));
269 }
270 }
271
272 let (removed_source_fragments, removed_actors, removed_fragments) =
273 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
274
275 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
277 .select_only()
278 .distinct()
279 .column(user_privilege::Column::UserId)
280 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
281 .into_tuple()
282 .all(&txn)
283 .await?;
284 let dropped_tables = Table::find()
285 .find_also_related(Object)
286 .filter(
287 table::Column::TableId.is_in(
288 removed_state_table_ids
289 .iter()
290 .copied()
291 .collect::<HashSet<ObjectId>>(),
292 ),
293 )
294 .all(&txn)
295 .await?
296 .into_iter()
297 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
298 let res = Object::delete_many()
300 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
301 .exec(&txn)
302 .await?;
303 if res.rows_affected == 0 {
304 return Err(MetaError::catalog_id_not_found(
305 object_type.as_str(),
306 object_id,
307 ));
308 }
309 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
310
311 txn.commit().await?;
312
313 self.notify_users_update(user_infos).await;
315 inner
316 .dropped_tables
317 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
318 let version = match object_type {
319 ObjectType::Database => {
320 self.notify_frontend(
322 NotificationOperation::Delete,
323 NotificationInfo::Database(PbDatabase {
324 id: database_id as _,
325 ..Default::default()
326 }),
327 )
328 .await
329 }
330 ObjectType::Schema => {
331 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
332 .into_values()
333 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
334 let schema_obj = schema_obj
335 .into_iter()
336 .exactly_one()
337 .expect("schema object not found");
338 to_notify_objs.push(schema_obj);
339
340 let relation_group = build_object_group_for_delete(to_notify_objs);
341 self.notify_frontend(NotificationOperation::Delete, relation_group)
342 .await
343 }
344 _ => {
345 let relation_group =
348 build_object_group_for_delete(removed_objects.into_values().collect());
349 self.notify_frontend(NotificationOperation::Delete, relation_group)
350 .await
351 }
352 };
353
354 let fragment_mappings = removed_fragments
355 .iter()
356 .map(|fragment_id| PbFragmentWorkerSlotMapping {
357 fragment_id: *fragment_id as _,
358 mapping: None,
359 })
360 .collect();
361
362 self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
363 .await;
364
365 Ok((
366 ReleaseContext {
367 database_id,
368 removed_streaming_job_ids,
369 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
370 removed_source_ids,
371 removed_secret_ids,
372 removed_source_fragments,
373 removed_actors,
374 removed_fragments,
375 },
376 version,
377 ))
378 }
379
380 pub async fn try_abort_creating_subscription(
381 &self,
382 subscription_id: SubscriptionId,
383 ) -> MetaResult<()> {
384 let inner = self.inner.write().await;
385 let txn = inner.db.begin().await?;
386
387 let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
388 let Some(subscription) = subscription else {
389 tracing::warn!(
390 subscription_id,
391 "subscription not found when aborting creation, might be cleaned by recovery"
392 );
393 return Ok(());
394 };
395
396 if subscription.subscription_state == PbSubscriptionState::Created as i32 {
397 tracing::warn!(
398 subscription_id,
399 "subscription is already created when aborting creation"
400 );
401 return Ok(());
402 }
403
404 subscription.delete(&txn).await?;
405 Ok(())
406 }
407}
408
409async fn report_drop_object(
410 object_type: ObjectType,
411 object_id: ObjectId,
412 txn: &DatabaseTransaction,
413) {
414 let connector_name = {
415 match object_type {
416 ObjectType::Sink => Sink::find_by_id(object_id)
417 .select_only()
418 .column(sink::Column::Properties)
419 .into_tuple::<Property>()
420 .one(txn)
421 .await
422 .ok()
423 .flatten()
424 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
425 ObjectType::Source => Source::find_by_id(object_id)
426 .select_only()
427 .column(source::Column::WithProperties)
428 .into_tuple::<Property>()
429 .one(txn)
430 .await
431 .ok()
432 .flatten()
433 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
434 _ => unreachable!(),
435 }
436 };
437 if let Some(connector_name) = connector_name {
438 report_event(
439 PbTelemetryEventStage::DropStreamJob,
440 "source",
441 object_id.into(),
442 Some(connector_name),
443 Some(match object_type {
444 ObjectType::Source => PbTelemetryDatabaseObject::Source,
445 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
446 _ => unreachable!(),
447 }),
448 None,
449 );
450 }
451}