risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_pb::catalog::PbTable;
16use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
17use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter};
18
19use super::*;
20impl CatalogController {
21 pub async fn drop_object(
24 &self,
25 object_type: ObjectType,
26 object_id: ObjectId,
27 drop_mode: DropMode,
28 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
29 let mut inner = self.inner.write().await;
30 let txn = inner.db.begin().await?;
31
32 let obj: PartialObject = Object::find_by_id(object_id)
33 .into_partial_model()
34 .one(&txn)
35 .await?
36 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
37 assert_eq!(obj.obj_type, object_type);
38 let drop_database = object_type == ObjectType::Database;
39 let database_id = if object_type == ObjectType::Database {
40 object_id
41 } else {
42 obj.database_id
43 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
44 };
45
46 if obj.obj_type == ObjectType::Subscription {
48 validate_subscription_deletion(&txn, object_id).await?;
49 }
50
51 let mut removed_objects = match drop_mode {
52 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
53 DropMode::Restrict => match object_type {
54 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
55 ObjectType::Schema => {
56 ensure_schema_empty(object_id, &txn).await?;
57 Default::default()
58 }
59 ObjectType::Table => {
60 ensure_object_not_refer(object_type, object_id, &txn).await?;
61 let indexes = get_referring_objects(object_id, &txn).await?;
62 for obj in indexes.iter().filter(|object| {
63 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
64 }) {
65 report_drop_object(obj.obj_type, obj.oid, &txn).await;
66 }
67 assert!(
68 indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
69 "only index could be dropped in restrict mode"
70 );
71 indexes
72 }
73 object_type @ (ObjectType::Source | ObjectType::Sink) => {
74 ensure_object_not_refer(object_type, object_id, &txn).await?;
75 report_drop_object(object_type, object_id, &txn).await;
76 vec![]
77 }
78
79 ObjectType::View
80 | ObjectType::Index
81 | ObjectType::Function
82 | ObjectType::Connection
83 | ObjectType::Subscription
84 | ObjectType::Secret => {
85 ensure_object_not_refer(object_type, object_id, &txn).await?;
86 vec![]
87 }
88 },
89 };
90 removed_objects.push(obj);
91 let mut removed_object_ids: HashSet<_> =
92 removed_objects.iter().map(|obj| obj.oid).collect();
93
94 let removed_incoming_sinks: Vec<I32Array> = Table::find()
97 .select_only()
98 .column(table::Column::IncomingSinks)
99 .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
100 .into_tuple()
101 .all(&txn)
102 .await?;
103 if !removed_incoming_sinks.is_empty() {
104 let removed_sink_objs: Vec<PartialObject> = Object::find()
105 .filter(
106 object::Column::Oid.is_in(
107 removed_incoming_sinks
108 .into_iter()
109 .flat_map(|arr| arr.into_inner().into_iter()),
110 ),
111 )
112 .into_partial_model()
113 .all(&txn)
114 .await?;
115
116 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
117 removed_objects.extend(removed_sink_objs);
118 }
119
120 if object_type != ObjectType::Sink {
122 for obj in &removed_objects {
123 if obj.obj_type == ObjectType::Sink {
124 let sink = Sink::find_by_id(obj.oid)
125 .one(&txn)
126 .await?
127 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
128
129 if let Some(target_table) = sink.target_table
131 && !removed_object_ids.contains(&target_table)
132 {
133 return Err(MetaError::permission_denied(format!(
134 "Found sink into table in dependency: {}, please drop it manually",
135 sink.name,
136 )));
137 }
138 }
139 }
140 }
141
142 if object_type != ObjectType::Table || drop_database {
145 for obj in &removed_objects {
146 if obj.obj_type == ObjectType::Table {
148 let table = Table::find_by_id(obj.oid)
149 .one(&txn)
150 .await?
151 .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
152 if matches!(table.engine, Some(table::Engine::Iceberg)) {
153 return Err(MetaError::permission_denied(format!(
154 "Found iceberg table in dependency: {}, please drop it manually",
155 table.name,
156 )));
157 }
158 }
159 }
160 }
161
162 let removed_table_ids = removed_objects
163 .iter()
164 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
165 .map(|obj| obj.oid);
166
167 let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
168 .select_only()
169 .column(streaming_job::Column::JobId)
170 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
171 .into_tuple()
172 .all(&txn)
173 .await?;
174
175 if !removed_streaming_job_ids.is_empty() {
177 let creating = StreamingJob::find()
178 .filter(
179 streaming_job::Column::JobStatus
180 .ne(JobStatus::Created)
181 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
182 )
183 .count(&txn)
184 .await?;
185 if creating != 0 {
186 return Err(MetaError::permission_denied(format!(
187 "can not drop {creating} creating streaming job, please cancel them firstly"
188 )));
189 }
190 }
191
192 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
193
194 let mut removed_source_ids: Vec<SourceId> = Table::find()
196 .select_only()
197 .column(table::Column::OptionalAssociatedSourceId)
198 .filter(
199 table::Column::TableId
200 .is_in(removed_table_ids)
201 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
202 )
203 .into_tuple()
204 .all(&txn)
205 .await?;
206 let removed_source_objs: Vec<PartialObject> = Object::find()
207 .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
208 .into_partial_model()
209 .all(&txn)
210 .await?;
211 removed_objects.extend(removed_source_objs);
212 if object_type == ObjectType::Source {
213 removed_source_ids.push(object_id);
214 }
215
216 let removed_secret_ids = removed_objects
217 .iter()
218 .filter(|obj| obj.obj_type == ObjectType::Secret)
219 .map(|obj| obj.oid)
220 .collect_vec();
221
222 if !removed_streaming_job_ids.is_empty() {
223 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
224 .select_only()
225 .columns([
226 object::Column::Oid,
227 object::Column::ObjType,
228 object::Column::SchemaId,
229 object::Column::DatabaseId,
230 ])
231 .join(JoinType::InnerJoin, object::Relation::Table.def())
232 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
233 .into_partial_model()
234 .all(&txn)
235 .await?;
236
237 removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
238 removed_objects.extend(removed_internal_table_objs);
239 }
240
241 let removed_objects: HashMap<_, _> = removed_objects
242 .into_iter()
243 .map(|obj| (obj.oid, obj))
244 .collect();
245
246 for obj in removed_objects.values() {
248 if let Some(obj_database_id) = obj.database_id {
249 if obj_database_id != database_id {
250 return Err(MetaError::permission_denied(format!(
251 "Referenced by other objects in database {obj_database_id}, please drop them manually"
252 )));
253 }
254 }
255 }
256
257 let (removed_source_fragments, removed_actors, removed_fragments) =
258 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
259
260 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
262 .select_only()
263 .distinct()
264 .column(user_privilege::Column::UserId)
265 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
266 .into_tuple()
267 .all(&txn)
268 .await?;
269 let dropped_tables = Table::find()
270 .find_also_related(Object)
271 .filter(
272 table::Column::TableId.is_in(
273 removed_state_table_ids
274 .iter()
275 .copied()
276 .collect::<HashSet<ObjectId>>(),
277 ),
278 )
279 .all(&txn)
280 .await?
281 .into_iter()
282 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
283 let res = Object::delete_many()
285 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
286 .exec(&txn)
287 .await?;
288 if res.rows_affected == 0 {
289 return Err(MetaError::catalog_id_not_found(
290 object_type.as_str(),
291 object_id,
292 ));
293 }
294 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
295
296 txn.commit().await?;
297
298 self.notify_users_update(user_infos).await;
300 inner
301 .dropped_tables
302 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
303 let version = match object_type {
304 ObjectType::Database => {
305 self.notify_frontend(
307 NotificationOperation::Delete,
308 NotificationInfo::Database(PbDatabase {
309 id: database_id as _,
310 ..Default::default()
311 }),
312 )
313 .await
314 }
315 ObjectType::Schema => {
316 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
317 .into_values()
318 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
319 let schema_obj = schema_obj
320 .into_iter()
321 .exactly_one()
322 .expect("schema object not found");
323 to_notify_objs.push(schema_obj);
324
325 let relation_group = build_object_group_for_delete(to_notify_objs);
326 self.notify_frontend(NotificationOperation::Delete, relation_group)
327 .await
328 }
329 _ => {
330 let relation_group =
333 build_object_group_for_delete(removed_objects.into_values().collect());
334 self.notify_frontend(NotificationOperation::Delete, relation_group)
335 .await
336 }
337 };
338
339 let fragment_mappings = removed_fragments
340 .iter()
341 .map(|fragment_id| PbFragmentWorkerSlotMapping {
342 fragment_id: *fragment_id as _,
343 mapping: None,
344 })
345 .collect();
346
347 self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
348 .await;
349
350 Ok((
351 ReleaseContext {
352 database_id,
353 removed_streaming_job_ids,
354 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
355 removed_source_ids,
356 removed_secret_ids,
357 removed_source_fragments,
358 removed_actors,
359 removed_fragments,
360 },
361 version,
362 ))
363 }
364}
365
366async fn report_drop_object(
367 object_type: ObjectType,
368 object_id: ObjectId,
369 txn: &DatabaseTransaction,
370) {
371 let connector_name = {
372 match object_type {
373 ObjectType::Sink => Sink::find_by_id(object_id)
374 .one(txn)
375 .await
376 .ok()
377 .flatten()
378 .and_then(|sink| sink.properties.inner_ref().get("connector").cloned()),
379 ObjectType::Source => Source::find_by_id(object_id)
380 .one(txn)
381 .await
382 .ok()
383 .flatten()
384 .and_then(|source| source.with_properties.inner_ref().get("connector").cloned()),
385 _ => unreachable!(),
386 }
387 };
388 if let Some(connector_name) = connector_name {
389 report_event(
390 PbTelemetryEventStage::DropStreamJob,
391 "source",
392 object_id.into(),
393 Some(connector_name),
394 Some(match object_type {
395 ObjectType::Source => PbTelemetryDatabaseObject::Source,
396 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
397 _ => unreachable!(),
398 }),
399 None,
400 );
401 }
402}