risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22 pub async fn drop_object(
25 &self,
26 object_type: ObjectType,
27 object_id: ObjectId,
28 drop_mode: DropMode,
29 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30 let mut inner = self.inner.write().await;
31 let txn = inner.db.begin().await?;
32
33 let obj: PartialObject = Object::find_by_id(object_id)
34 .into_partial_model()
35 .one(&txn)
36 .await?
37 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38 assert_eq!(obj.obj_type, object_type);
39 let drop_database = object_type == ObjectType::Database;
40 let database_id = if object_type == ObjectType::Database {
41 object_id
42 } else {
43 obj.database_id
44 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45 };
46
47 if obj.obj_type == ObjectType::Subscription {
49 validate_subscription_deletion(&txn, object_id).await?;
50 }
51
52 let mut removed_objects = match drop_mode {
53 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54 DropMode::Restrict => match object_type {
55 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56 ObjectType::Schema => {
57 ensure_schema_empty(object_id, &txn).await?;
58 Default::default()
59 }
60 ObjectType::Table => {
61 check_object_refer_for_drop(object_type, object_id, &txn).await?;
62 let indexes = get_referring_objects(object_id, &txn).await?;
63 for obj in indexes.iter().filter(|object| {
64 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65 }) {
66 report_drop_object(obj.obj_type, obj.oid, &txn).await;
67 }
68 assert!(
69 indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70 "only index could be dropped in restrict mode"
71 );
72 indexes
73 }
74 object_type @ (ObjectType::Source | ObjectType::Sink) => {
75 check_object_refer_for_drop(object_type, object_id, &txn).await?;
76 report_drop_object(object_type, object_id, &txn).await;
77 vec![]
78 }
79
80 ObjectType::View
81 | ObjectType::Index
82 | ObjectType::Function
83 | ObjectType::Connection
84 | ObjectType::Subscription
85 | ObjectType::Secret => {
86 check_object_refer_for_drop(object_type, object_id, &txn).await?;
87 vec![]
88 }
89 },
90 };
91 removed_objects.push(obj);
92 let mut removed_object_ids: HashSet<_> =
93 removed_objects.iter().map(|obj| obj.oid).collect();
94
95 let removed_incoming_sinks: Vec<I32Array> = Table::find()
98 .select_only()
99 .column(table::Column::IncomingSinks)
100 .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
101 .into_tuple()
102 .all(&txn)
103 .await?;
104 if !removed_incoming_sinks.is_empty() {
105 let incoming_sink_ids = removed_incoming_sinks
106 .into_iter()
107 .flat_map(|arr| arr.into_inner().into_iter())
108 .collect_vec();
109
110 if self.env.opts.protect_drop_table_with_incoming_sink {
111 let sink_names: Vec<String> = Sink::find()
112 .select_only()
113 .column(sink::Column::Name)
114 .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
115 .into_tuple()
116 .all(&txn)
117 .await?;
118
119 return Err(MetaError::permission_denied(format!(
120 "Table used by incoming sinks: {:?}, please drop them manually",
121 sink_names
122 )));
123 }
124
125 let removed_sink_objs: Vec<PartialObject> = Object::find()
126 .filter(object::Column::Oid.is_in(incoming_sink_ids))
127 .into_partial_model()
128 .all(&txn)
129 .await?;
130
131 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
132 removed_objects.extend(removed_sink_objs);
133 }
134
135 if object_type != ObjectType::Sink {
137 for obj in &removed_objects {
138 if obj.obj_type == ObjectType::Sink {
139 let sink = Sink::find_by_id(obj.oid)
140 .one(&txn)
141 .await?
142 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
143
144 if let Some(target_table) = sink.target_table
146 && !removed_object_ids.contains(&target_table)
147 {
148 return Err(MetaError::permission_denied(format!(
149 "Found sink into table in dependency: {}, please drop it manually",
150 sink.name,
151 )));
152 }
153 }
154 }
155 }
156
157 if object_type != ObjectType::Table || drop_database {
160 for obj in &removed_objects {
161 if obj.obj_type == ObjectType::Table {
163 let table = Table::find_by_id(obj.oid)
164 .one(&txn)
165 .await?
166 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
167 if matches!(table.engine, Some(table::Engine::Iceberg)) {
168 return Err(MetaError::permission_denied(format!(
169 "Found iceberg table in dependency: {}, please drop it manually",
170 table.name,
171 )));
172 }
173 }
174 }
175 }
176
177 let removed_table_ids = removed_objects
178 .iter()
179 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
180 .map(|obj| obj.oid);
181
182 let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
183 .select_only()
184 .column(streaming_job::Column::JobId)
185 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
186 .into_tuple()
187 .all(&txn)
188 .await?;
189
190 if !removed_streaming_job_ids.is_empty() {
192 let creating = StreamingJob::find()
193 .filter(
194 streaming_job::Column::JobStatus
195 .ne(JobStatus::Created)
196 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
197 )
198 .count(&txn)
199 .await?;
200 if creating != 0 {
201 if creating == 1 && object_type == ObjectType::Sink {
202 info!("dropping creating sink job, it will be cancelled");
203 } else {
204 return Err(MetaError::permission_denied(format!(
205 "can not drop {creating} creating streaming job, please cancel them firstly"
206 )));
207 }
208 }
209 }
210
211 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
212
213 let mut removed_source_ids: Vec<SourceId> = Table::find()
215 .select_only()
216 .column(table::Column::OptionalAssociatedSourceId)
217 .filter(
218 table::Column::TableId
219 .is_in(removed_table_ids)
220 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
221 )
222 .into_tuple()
223 .all(&txn)
224 .await?;
225 let removed_source_objs: Vec<PartialObject> = Object::find()
226 .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
227 .into_partial_model()
228 .all(&txn)
229 .await?;
230 removed_objects.extend(removed_source_objs);
231 if object_type == ObjectType::Source {
232 removed_source_ids.push(object_id);
233 }
234
235 let removed_secret_ids = removed_objects
236 .iter()
237 .filter(|obj| obj.obj_type == ObjectType::Secret)
238 .map(|obj| obj.oid)
239 .collect_vec();
240
241 if !removed_streaming_job_ids.is_empty() {
242 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
243 .select_only()
244 .columns([
245 object::Column::Oid,
246 object::Column::ObjType,
247 object::Column::SchemaId,
248 object::Column::DatabaseId,
249 ])
250 .join(JoinType::InnerJoin, object::Relation::Table.def())
251 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
252 .into_partial_model()
253 .all(&txn)
254 .await?;
255
256 removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
257 removed_objects.extend(removed_internal_table_objs);
258 }
259
260 let removed_objects: HashMap<_, _> = removed_objects
261 .into_iter()
262 .map(|obj| (obj.oid, obj))
263 .collect();
264
265 for obj in removed_objects.values() {
267 if let Some(obj_database_id) = obj.database_id
268 && obj_database_id != database_id
269 {
270 return Err(MetaError::permission_denied(format!(
271 "Referenced by other objects in database {obj_database_id}, please drop them manually"
272 )));
273 }
274 }
275
276 let (removed_source_fragments, removed_actors, removed_fragments) =
277 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
278
279 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
281 .select_only()
282 .distinct()
283 .column(user_privilege::Column::UserId)
284 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
285 .into_tuple()
286 .all(&txn)
287 .await?;
288 let dropped_tables = Table::find()
289 .find_also_related(Object)
290 .filter(
291 table::Column::TableId.is_in(
292 removed_state_table_ids
293 .iter()
294 .copied()
295 .collect::<HashSet<ObjectId>>(),
296 ),
297 )
298 .all(&txn)
299 .await?
300 .into_iter()
301 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
302 let res = Object::delete_many()
304 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
305 .exec(&txn)
306 .await?;
307 if res.rows_affected == 0 {
308 return Err(MetaError::catalog_id_not_found(
309 object_type.as_str(),
310 object_id,
311 ));
312 }
313 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
314
315 txn.commit().await?;
316
317 self.notify_users_update(user_infos).await;
319 inner
320 .dropped_tables
321 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
322 let version = match object_type {
323 ObjectType::Database => {
324 self.notify_frontend(
326 NotificationOperation::Delete,
327 NotificationInfo::Database(PbDatabase {
328 id: database_id as _,
329 ..Default::default()
330 }),
331 )
332 .await
333 }
334 ObjectType::Schema => {
335 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
336 .into_values()
337 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
338 let schema_obj = schema_obj
339 .into_iter()
340 .exactly_one()
341 .expect("schema object not found");
342 to_notify_objs.push(schema_obj);
343
344 let relation_group = build_object_group_for_delete(to_notify_objs);
345 self.notify_frontend(NotificationOperation::Delete, relation_group)
346 .await
347 }
348 _ => {
349 let relation_group =
352 build_object_group_for_delete(removed_objects.into_values().collect());
353 self.notify_frontend(NotificationOperation::Delete, relation_group)
354 .await
355 }
356 };
357
358 Ok((
359 ReleaseContext {
360 database_id,
361 removed_streaming_job_ids,
362 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
363 removed_source_ids,
364 removed_secret_ids,
365 removed_source_fragments,
366 removed_actors,
367 removed_fragments,
368 },
369 version,
370 ))
371 }
372
373 pub async fn try_abort_creating_subscription(
374 &self,
375 subscription_id: SubscriptionId,
376 ) -> MetaResult<()> {
377 let inner = self.inner.write().await;
378 let txn = inner.db.begin().await?;
379
380 let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
381 let Some(subscription) = subscription else {
382 tracing::warn!(
383 subscription_id,
384 "subscription not found when aborting creation, might be cleaned by recovery"
385 );
386 return Ok(());
387 };
388
389 if subscription.subscription_state == PbSubscriptionState::Created as i32 {
390 tracing::warn!(
391 subscription_id,
392 "subscription is already created when aborting creation"
393 );
394 return Ok(());
395 }
396
397 subscription.delete(&txn).await?;
398 Ok(())
399 }
400}
401
402async fn report_drop_object(
403 object_type: ObjectType,
404 object_id: ObjectId,
405 txn: &DatabaseTransaction,
406) {
407 let connector_name = {
408 match object_type {
409 ObjectType::Sink => Sink::find_by_id(object_id)
410 .select_only()
411 .column(sink::Column::Properties)
412 .into_tuple::<Property>()
413 .one(txn)
414 .await
415 .ok()
416 .flatten()
417 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
418 ObjectType::Source => Source::find_by_id(object_id)
419 .select_only()
420 .column(source::Column::WithProperties)
421 .into_tuple::<Property>()
422 .one(txn)
423 .await
424 .ok()
425 .flatten()
426 .and_then(|properties| properties.inner_ref().get("connector").cloned()),
427 _ => unreachable!(),
428 }
429 };
430 if let Some(connector_name) = connector_name {
431 report_event(
432 PbTelemetryEventStage::DropStreamJob,
433 "source",
434 object_id.into(),
435 Some(connector_name),
436 Some(match object_type {
437 ObjectType::Source => PbTelemetryDatabaseObject::Source,
438 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
439 _ => unreachable!(),
440 }),
441 None,
442 );
443 }
444}