risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22 pub async fn drop_object(
25 &self,
26 object_type: ObjectType,
27 object_id: ObjectId,
28 drop_mode: DropMode,
29 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30 let mut inner = self.inner.write().await;
31 let txn = inner.db.begin().await?;
32
33 let obj: PartialObject = Object::find_by_id(object_id)
34 .into_partial_model()
35 .one(&txn)
36 .await?
37 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38 assert_eq!(obj.obj_type, object_type);
39 let drop_database = object_type == ObjectType::Database;
40 let database_id = if object_type == ObjectType::Database {
41 object_id
42 } else {
43 obj.database_id
44 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45 };
46
47 if obj.obj_type == ObjectType::Subscription {
49 validate_subscription_deletion(&txn, object_id).await?;
50 }
51
52 let mut removed_objects = match drop_mode {
53 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54 DropMode::Restrict => match object_type {
55 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56 ObjectType::Schema => {
57 ensure_schema_empty(object_id, &txn).await?;
58 Default::default()
59 }
60 ObjectType::Table => {
61 check_object_refer_for_drop(object_type, object_id, &txn).await?;
62 let indexes = get_referring_objects(object_id, &txn).await?;
63 for obj in indexes.iter().filter(|object| {
64 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65 }) {
66 report_drop_object(obj.obj_type, obj.oid, &txn).await;
67 }
68 assert!(
69 indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70 "only index could be dropped in restrict mode"
71 );
72 for idx in &indexes {
73 check_object_refer_for_drop(idx.obj_type, idx.oid, &txn).await?;
74 }
75 indexes
76 }
77 object_type @ (ObjectType::Source | ObjectType::Sink) => {
78 check_object_refer_for_drop(object_type, object_id, &txn).await?;
79 report_drop_object(object_type, object_id, &txn).await;
80 vec![]
81 }
82
83 ObjectType::View
84 | ObjectType::Index
85 | ObjectType::Function
86 | ObjectType::Connection
87 | ObjectType::Subscription
88 | ObjectType::Secret => {
89 check_object_refer_for_drop(object_type, object_id, &txn).await?;
90 vec![]
91 }
92 },
93 };
94 removed_objects.push(obj);
95 let mut removed_object_ids: HashSet<_> =
96 removed_objects.iter().map(|obj| obj.oid).collect();
97
98 let incoming_sink_ids: Vec<SinkId> = Sink::find()
101 .select_only()
102 .column(sink::Column::SinkId)
103 .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
104 .into_tuple()
105 .all(&txn)
106 .await?;
107 if !incoming_sink_ids.is_empty() {
108 if self.env.opts.protect_drop_table_with_incoming_sink {
109 let sink_names: Vec<String> = Sink::find()
110 .select_only()
111 .column(sink::Column::Name)
112 .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
113 .into_tuple()
114 .all(&txn)
115 .await?;
116
117 return Err(MetaError::permission_denied(format!(
118 "Table used by incoming sinks: {:?}, please drop them manually",
119 sink_names
120 )));
121 }
122
123 let removed_sink_objs: Vec<PartialObject> = Object::find()
124 .filter(object::Column::Oid.is_in(incoming_sink_ids))
125 .into_partial_model()
126 .all(&txn)
127 .await?;
128
129 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
130 removed_objects.extend(removed_sink_objs);
131 }
132
133 for obj in &removed_objects {
134 if obj.obj_type == ObjectType::Sink {
135 let sink = Sink::find_by_id(obj.oid)
136 .one(&txn)
137 .await?
138 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
139
140 if let Some(target_table) = sink.target_table
141 && !removed_object_ids.contains(&target_table)
142 && !has_table_been_migrated(&txn, target_table).await?
143 {
144 return Err(anyhow::anyhow!(
145 "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
146 target_table
147 ).into());
148 }
149 }
150 }
151
152 if object_type != ObjectType::Table || drop_database {
155 for obj in &removed_objects {
156 if obj.obj_type == ObjectType::Table {
158 let table = Table::find_by_id(obj.oid)
159 .one(&txn)
160 .await?
161 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
162 if matches!(table.engine, Some(table::Engine::Iceberg)) {
163 return Err(MetaError::permission_denied(format!(
164 "Found iceberg table in dependency: {}, please drop it manually",
165 table.name,
166 )));
167 }
168 }
169 }
170 }
171
172 let removed_table_ids = removed_objects
173 .iter()
174 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
175 .map(|obj| obj.oid);
176
177 let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
178 .select_only()
179 .column(streaming_job::Column::JobId)
180 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
181 .into_tuple()
182 .all(&txn)
183 .await?;
184
185 if !removed_streaming_job_ids.is_empty() {
187 let creating = StreamingJob::find()
188 .filter(
189 streaming_job::Column::JobStatus
190 .ne(JobStatus::Created)
191 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
192 )
193 .count(&txn)
194 .await?;
195 if creating != 0 {
196 if creating == 1 && object_type == ObjectType::Sink {
197 info!("dropping creating sink job, it will be cancelled");
198 } else {
199 return Err(MetaError::permission_denied(format!(
200 "can not drop {creating} creating streaming job, please cancel them firstly"
201 )));
202 }
203 }
204 }
205
206 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
207
208 let mut removed_source_ids: Vec<SourceId> = Table::find()
210 .select_only()
211 .column(table::Column::OptionalAssociatedSourceId)
212 .filter(
213 table::Column::TableId
214 .is_in(removed_table_ids)
215 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
216 )
217 .into_tuple()
218 .all(&txn)
219 .await?;
220 let removed_source_objs: Vec<PartialObject> = Object::find()
221 .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
222 .into_partial_model()
223 .all(&txn)
224 .await?;
225 removed_objects.extend(removed_source_objs);
226 if object_type == ObjectType::Source {
227 removed_source_ids.push(object_id);
228 }
229
230 let removed_secret_ids = removed_objects
231 .iter()
232 .filter(|obj| obj.obj_type == ObjectType::Secret)
233 .map(|obj| obj.oid)
234 .collect_vec();
235
236 if !removed_streaming_job_ids.is_empty() {
237 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
238 .select_only()
239 .columns([
240 object::Column::Oid,
241 object::Column::ObjType,
242 object::Column::SchemaId,
243 object::Column::DatabaseId,
244 ])
245 .join(JoinType::InnerJoin, object::Relation::Table.def())
246 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
247 .into_partial_model()
248 .all(&txn)
249 .await?;
250
251 removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
252 removed_objects.extend(removed_internal_table_objs);
253 }
254
255 let removed_objects: HashMap<_, _> = removed_objects
256 .into_iter()
257 .map(|obj| (obj.oid, obj))
258 .collect();
259
260 for obj in removed_objects.values() {
262 if let Some(obj_database_id) = obj.database_id
263 && obj_database_id != database_id
264 {
265 return Err(MetaError::permission_denied(format!(
266 "Referenced by other objects in database {obj_database_id}, please drop them manually"
267 )));
268 }
269 }
270
271 let (removed_source_fragments, removed_sink_fragments, removed_actors, removed_fragments) =
272 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
273
274 let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
275 let mut removed_sink_fragment_by_targets = HashMap::new();
276 for (sink_fragment, target_fragments) in sink_target_fragments {
277 assert!(
278 target_fragments.len() <= 1,
279 "sink should have at most one downstream fragment"
280 );
281 if let Some(target_fragment) = target_fragments.first()
282 && !removed_fragments.contains(target_fragment)
283 {
284 removed_sink_fragment_by_targets
285 .entry(*target_fragment)
286 .or_insert_with(Vec::new)
287 .push(sink_fragment);
288 }
289 }
290
291 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
293 .select_only()
294 .distinct()
295 .column(user_privilege::Column::UserId)
296 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
297 .into_tuple()
298 .all(&txn)
299 .await?;
300 let dropped_tables = Table::find()
301 .find_also_related(Object)
302 .filter(
303 table::Column::TableId.is_in(
304 removed_state_table_ids
305 .iter()
306 .copied()
307 .collect::<HashSet<ObjectId>>(),
308 ),
309 )
310 .all(&txn)
311 .await?
312 .into_iter()
313 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
314 let res = Object::delete_many()
316 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
317 .exec(&txn)
318 .await?;
319 if res.rows_affected == 0 {
320 return Err(MetaError::catalog_id_not_found(
321 object_type.as_str(),
322 object_id,
323 ));
324 }
325 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
326
327 txn.commit().await?;
328
329 self.notify_users_update(user_infos).await;
331 inner
332 .dropped_tables
333 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
334
335 let version = match object_type {
336 ObjectType::Database => {
337 self.notify_frontend(
339 NotificationOperation::Delete,
340 NotificationInfo::Database(PbDatabase {
341 id: database_id as _,
342 ..Default::default()
343 }),
344 )
345 .await
346 }
347 ObjectType::Schema => {
348 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
349 .into_values()
350 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
351 let schema_obj = schema_obj
352 .into_iter()
353 .exactly_one()
354 .expect("schema object not found");
355 to_notify_objs.push(schema_obj);
356
357 let relation_group = build_object_group_for_delete(to_notify_objs);
358 self.notify_frontend(NotificationOperation::Delete, relation_group)
359 .await
360 }
361 _ => {
362 let relation_group =
365 build_object_group_for_delete(removed_objects.into_values().collect());
366 self.notify_frontend(NotificationOperation::Delete, relation_group)
367 .await
368 }
369 };
370
371 Ok((
372 ReleaseContext {
373 database_id,
374 removed_streaming_job_ids,
375 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
376 removed_source_ids,
377 removed_secret_ids,
378 removed_source_fragments,
379 removed_actors,
380 removed_fragments,
381 removed_sink_fragment_by_targets,
382 },
383 version,
384 ))
385 }
386
387 pub async fn try_abort_creating_subscription(
388 &self,
389 subscription_id: SubscriptionId,
390 ) -> MetaResult<()> {
391 let inner = self.inner.write().await;
392 let txn = inner.db.begin().await?;
393
394 let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
395 let Some(subscription) = subscription else {
396 tracing::warn!(
397 subscription_id,
398 "subscription not found when aborting creation, might be cleaned by recovery"
399 );
400 return Ok(());
401 };
402
403 if subscription.subscription_state == PbSubscriptionState::Created as i32 {
404 tracing::warn!(
405 subscription_id,
406 "subscription is already created when aborting creation"
407 );
408 return Ok(());
409 }
410
411 subscription.delete(&txn).await?;
412 Ok(())
413 }
414}
415
416async fn report_drop_object(
417 object_type: ObjectType,
418 object_id: ObjectId,
419 txn: &DatabaseTransaction,
420) {
421 let connector_name = {
422 match object_type {
423 ObjectType::Sink => Sink::find_by_id(object_id)
424 .select_only()
425 .column(sink::Column::Properties)
426 .into_tuple::<Property>()
427 .one(txn)
428 .await
429 .ok()
430 .flatten()
431 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
432 ObjectType::Source => Source::find_by_id(object_id)
433 .select_only()
434 .column(source::Column::WithProperties)
435 .into_tuple::<Property>()
436 .one(txn)
437 .await
438 .ok()
439 .flatten()
440 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
441 _ => unreachable!(),
442 }
443 };
444 if let Some(connector_name) = connector_name {
445 report_event(
446 PbTelemetryEventStage::DropStreamJob,
447 "source",
448 object_id.into(),
449 Some(connector_name),
450 Some(match object_type {
451 ObjectType::Source => PbTelemetryDatabaseObject::Source,
452 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
453 _ => unreachable!(),
454 }),
455 None,
456 );
457 }
458}