risingwave_meta/controller/catalog/
drop_op.rs1use risingwave_pb::catalog::PbTable;
16use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
17use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter};
18
19use super::*;
20impl CatalogController {
21 pub async fn drop_object(
24 &self,
25 object_type: ObjectType,
26 object_id: ObjectId,
27 drop_mode: DropMode,
28 ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
29 let mut inner = self.inner.write().await;
30 let txn = inner.db.begin().await?;
31
32 let obj: PartialObject = Object::find_by_id(object_id)
33 .into_partial_model()
34 .one(&txn)
35 .await?
36 .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
37 assert_eq!(obj.obj_type, object_type);
38 let database_id = if object_type == ObjectType::Database {
39 object_id
40 } else {
41 obj.database_id
42 .ok_or_else(|| anyhow!("dropped object should have database_id"))?
43 };
44
45 if obj.obj_type == ObjectType::Subscription {
47 validate_subscription_deletion(&txn, object_id).await?;
48 }
49
50 let mut removed_objects = match drop_mode {
51 DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
52 DropMode::Restrict => match object_type {
53 ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
54 ObjectType::Schema => {
55 ensure_schema_empty(object_id, &txn).await?;
56 Default::default()
57 }
58 ObjectType::Table => {
59 ensure_object_not_refer(object_type, object_id, &txn).await?;
60 let indexes = get_referring_objects(object_id, &txn).await?;
61 for obj in indexes.iter().filter(|object| {
62 object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
63 }) {
64 report_drop_object(obj.obj_type, obj.oid, &txn).await;
65 }
66 assert!(
67 indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
68 "only index could be dropped in restrict mode"
69 );
70 indexes
71 }
72 object_type @ (ObjectType::Source | ObjectType::Sink) => {
73 ensure_object_not_refer(object_type, object_id, &txn).await?;
74 report_drop_object(object_type, object_id, &txn).await;
75 vec![]
76 }
77
78 ObjectType::View
79 | ObjectType::Index
80 | ObjectType::Function
81 | ObjectType::Connection
82 | ObjectType::Subscription
83 | ObjectType::Secret => {
84 ensure_object_not_refer(object_type, object_id, &txn).await?;
85 vec![]
86 }
87 },
88 };
89 removed_objects.push(obj);
90 let mut removed_object_ids: HashSet<_> =
91 removed_objects.iter().map(|obj| obj.oid).collect();
92
93 let removed_incoming_sinks: Vec<I32Array> = Table::find()
96 .select_only()
97 .column(table::Column::IncomingSinks)
98 .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
99 .into_tuple()
100 .all(&txn)
101 .await?;
102 if !removed_incoming_sinks.is_empty() {
103 let removed_sink_objs: Vec<PartialObject> = Object::find()
104 .filter(
105 object::Column::Oid.is_in(
106 removed_incoming_sinks
107 .into_iter()
108 .flat_map(|arr| arr.into_inner().into_iter()),
109 ),
110 )
111 .into_partial_model()
112 .all(&txn)
113 .await?;
114
115 removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
116 removed_objects.extend(removed_sink_objs);
117 }
118
119 if object_type != ObjectType::Sink {
121 for obj in &removed_objects {
122 if obj.obj_type == ObjectType::Sink {
123 let sink = Sink::find_by_id(obj.oid)
124 .one(&txn)
125 .await?
126 .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
127
128 if let Some(target_table) = sink.target_table
130 && !removed_object_ids.contains(&target_table)
131 {
132 return Err(MetaError::permission_denied(format!(
133 "Found sink into table in dependency: {}, please drop it manually",
134 sink.name,
135 )));
136 }
137 }
138 }
139 }
140
141 let removed_table_ids = removed_objects
142 .iter()
143 .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
144 .map(|obj| obj.oid);
145
146 let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
147 .select_only()
148 .column(streaming_job::Column::JobId)
149 .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
150 .into_tuple()
151 .all(&txn)
152 .await?;
153
154 if !removed_streaming_job_ids.is_empty() {
156 let creating = StreamingJob::find()
157 .filter(
158 streaming_job::Column::JobStatus
159 .ne(JobStatus::Created)
160 .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
161 )
162 .count(&txn)
163 .await?;
164 if creating != 0 {
165 return Err(MetaError::permission_denied(format!(
166 "can not drop {creating} creating streaming job, please cancel them firstly"
167 )));
168 }
169 }
170
171 let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
172
173 let mut removed_source_ids: Vec<SourceId> = Table::find()
175 .select_only()
176 .column(table::Column::OptionalAssociatedSourceId)
177 .filter(
178 table::Column::TableId
179 .is_in(removed_table_ids)
180 .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
181 )
182 .into_tuple()
183 .all(&txn)
184 .await?;
185 let removed_source_objs: Vec<PartialObject> = Object::find()
186 .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
187 .into_partial_model()
188 .all(&txn)
189 .await?;
190 removed_objects.extend(removed_source_objs);
191 if object_type == ObjectType::Source {
192 removed_source_ids.push(object_id);
193 }
194
195 let removed_secret_ids = removed_objects
196 .iter()
197 .filter(|obj| obj.obj_type == ObjectType::Secret)
198 .map(|obj| obj.oid)
199 .collect_vec();
200
201 if !removed_streaming_job_ids.is_empty() {
202 let removed_internal_table_objs: Vec<PartialObject> = Object::find()
203 .select_only()
204 .columns([
205 object::Column::Oid,
206 object::Column::ObjType,
207 object::Column::SchemaId,
208 object::Column::DatabaseId,
209 ])
210 .join(JoinType::InnerJoin, object::Relation::Table.def())
211 .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
212 .into_partial_model()
213 .all(&txn)
214 .await?;
215
216 removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
217 removed_objects.extend(removed_internal_table_objs);
218 }
219
220 let removed_objects: HashMap<_, _> = removed_objects
221 .into_iter()
222 .map(|obj| (obj.oid, obj))
223 .collect();
224
225 for obj in removed_objects.values() {
227 if let Some(obj_database_id) = obj.database_id {
228 if obj_database_id != database_id {
229 return Err(MetaError::permission_denied(format!(
230 "Referenced by other objects in database {obj_database_id}, please drop them manually"
231 )));
232 }
233 }
234 }
235
236 let (removed_source_fragments, removed_actors, removed_fragments) =
237 get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
238
239 let updated_user_ids: Vec<UserId> = UserPrivilege::find()
241 .select_only()
242 .distinct()
243 .column(user_privilege::Column::UserId)
244 .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
245 .into_tuple()
246 .all(&txn)
247 .await?;
248 let dropped_tables = Table::find()
249 .find_also_related(Object)
250 .filter(
251 table::Column::TableId.is_in(
252 removed_state_table_ids
253 .iter()
254 .copied()
255 .collect::<HashSet<ObjectId>>(),
256 ),
257 )
258 .all(&txn)
259 .await?
260 .into_iter()
261 .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
262 let res = Object::delete_many()
264 .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
265 .exec(&txn)
266 .await?;
267 if res.rows_affected == 0 {
268 return Err(MetaError::catalog_id_not_found(
269 object_type.as_str(),
270 object_id,
271 ));
272 }
273 let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
274
275 txn.commit().await?;
276
277 self.notify_users_update(user_infos).await;
279 inner
280 .dropped_tables
281 .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
282 let version = match object_type {
283 ObjectType::Database => {
284 self.notify_frontend(
286 NotificationOperation::Delete,
287 NotificationInfo::Database(PbDatabase {
288 id: database_id as _,
289 ..Default::default()
290 }),
291 )
292 .await
293 }
294 ObjectType::Schema => {
295 let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
296 .into_values()
297 .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
298 let schema_obj = schema_obj
299 .into_iter()
300 .exactly_one()
301 .expect("schema object not found");
302 to_notify_objs.push(schema_obj);
303
304 let relation_group = build_object_group_for_delete(to_notify_objs);
305 self.notify_frontend(NotificationOperation::Delete, relation_group)
306 .await
307 }
308 _ => {
309 let relation_group =
312 build_object_group_for_delete(removed_objects.into_values().collect());
313 self.notify_frontend(NotificationOperation::Delete, relation_group)
314 .await
315 }
316 };
317
318 let fragment_mappings = removed_fragments
319 .iter()
320 .map(|fragment_id| PbFragmentWorkerSlotMapping {
321 fragment_id: *fragment_id as _,
322 mapping: None,
323 })
324 .collect();
325
326 self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
327 .await;
328
329 Ok((
330 ReleaseContext {
331 database_id,
332 removed_streaming_job_ids,
333 removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
334 removed_source_ids,
335 removed_secret_ids,
336 removed_source_fragments,
337 removed_actors,
338 removed_fragments,
339 },
340 version,
341 ))
342 }
343}
344
345async fn report_drop_object(
346 object_type: ObjectType,
347 object_id: ObjectId,
348 txn: &DatabaseTransaction,
349) {
350 let connector_name = {
351 match object_type {
352 ObjectType::Sink => Sink::find_by_id(object_id)
353 .one(txn)
354 .await
355 .ok()
356 .flatten()
357 .and_then(|sink| sink.properties.inner_ref().get("connector").cloned()),
358 ObjectType::Source => Source::find_by_id(object_id)
359 .one(txn)
360 .await
361 .ok()
362 .flatten()
363 .and_then(|source| source.with_properties.inner_ref().get("connector").cloned()),
364 _ => unreachable!(),
365 }
366 };
367 if let Some(connector_name) = connector_name {
368 report_event(
369 PbTelemetryEventStage::DropStreamJob,
370 "source",
371 object_id.into(),
372 Some(connector_name),
373 Some(match object_type {
374 ObjectType::Source => PbTelemetryDatabaseObject::Source,
375 ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
376 _ => unreachable!(),
377 }),
378 None,
379 );
380 }
381}