risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22    // Drop all kinds of objects including databases,
23    // schemas, relations, connections, functions, etc.
24    pub async fn drop_object(
25        &self,
26        object_type: ObjectType,
27        object_id: ObjectId,
28        drop_mode: DropMode,
29    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30        let mut inner = self.inner.write().await;
31        let txn = inner.db.begin().await?;
32
33        let obj: PartialObject = Object::find_by_id(object_id)
34            .into_partial_model()
35            .one(&txn)
36            .await?
37            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38        assert_eq!(obj.obj_type, object_type);
39        let drop_database = object_type == ObjectType::Database;
40        let database_id = if object_type == ObjectType::Database {
41            object_id
42        } else {
43            obj.database_id
44                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45        };
46
47        // Check the cross-db dependency info to see if the subscription can be dropped.
48        if obj.obj_type == ObjectType::Subscription {
49            validate_subscription_deletion(&txn, object_id).await?;
50        }
51
52        let mut removed_objects = match drop_mode {
53            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54            DropMode::Restrict => match object_type {
55                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56                ObjectType::Schema => {
57                    ensure_schema_empty(object_id, &txn).await?;
58                    Default::default()
59                }
60                ObjectType::Table => {
61                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
62                    let indexes = get_referring_objects(object_id, &txn).await?;
63                    for obj in indexes.iter().filter(|object| {
64                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65                    }) {
66                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
67                    }
68                    assert!(
69                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70                        "only index could be dropped in restrict mode"
71                    );
72                    indexes
73                }
74                object_type @ (ObjectType::Source | ObjectType::Sink) => {
75                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
76                    report_drop_object(object_type, object_id, &txn).await;
77                    vec![]
78                }
79
80                ObjectType::View
81                | ObjectType::Index
82                | ObjectType::Function
83                | ObjectType::Connection
84                | ObjectType::Subscription
85                | ObjectType::Secret => {
86                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
87                    vec![]
88                }
89            },
90        };
91        removed_objects.push(obj);
92        let mut removed_object_ids: HashSet<_> =
93            removed_objects.iter().map(|obj| obj.oid).collect();
94
95        // TODO: record dependency info in object_dependency table for sink into table.
96        // Special handling for 'sink into table'.
97        let removed_incoming_sinks: Vec<I32Array> = Table::find()
98            .select_only()
99            .column(table::Column::IncomingSinks)
100            .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
101            .into_tuple()
102            .all(&txn)
103            .await?;
104        if !removed_incoming_sinks.is_empty() {
105            let incoming_sink_ids = removed_incoming_sinks
106                .into_iter()
107                .flat_map(|arr| arr.into_inner().into_iter())
108                .collect_vec();
109
110            if self.env.opts.protect_drop_table_with_incoming_sink {
111                let sink_names: Vec<String> = Sink::find()
112                    .select_only()
113                    .column(sink::Column::Name)
114                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
115                    .into_tuple()
116                    .all(&txn)
117                    .await?;
118
119                return Err(MetaError::permission_denied(format!(
120                    "Table used by incoming sinks: {:?}, please drop them manually",
121                    sink_names
122                )));
123            }
124
125            let removed_sink_objs: Vec<PartialObject> = Object::find()
126                .filter(object::Column::Oid.is_in(incoming_sink_ids))
127                .into_partial_model()
128                .all(&txn)
129                .await?;
130
131            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
132            removed_objects.extend(removed_sink_objs);
133        }
134
135        // When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink.
136        if object_type != ObjectType::Sink {
137            for obj in &removed_objects {
138                if obj.obj_type == ObjectType::Sink {
139                    let sink = Sink::find_by_id(obj.oid)
140                        .one(&txn)
141                        .await?
142                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
143
144                    // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping.
145                    if let Some(target_table) = sink.target_table
146                        && !removed_object_ids.contains(&target_table)
147                    {
148                        return Err(MetaError::permission_denied(format!(
149                            "Found sink into table in dependency: {}, please drop it manually",
150                            sink.name,
151                        )));
152                    }
153                }
154            }
155        }
156
157        // 1. Detect when an Iceberg table is part of the dependencies.
158        // 2. Drop database with iceberg tables in it is not supported.
159        if object_type != ObjectType::Table || drop_database {
160            for obj in &removed_objects {
161                // if the obj is iceberg engine table, bail out
162                if obj.obj_type == ObjectType::Table {
163                    let table = Table::find_by_id(obj.oid)
164                        .one(&txn)
165                        .await?
166                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
167                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
168                        return Err(MetaError::permission_denied(format!(
169                            "Found iceberg table in dependency: {}, please drop it manually",
170                            table.name,
171                        )));
172                    }
173                }
174            }
175        }
176
177        let removed_table_ids = removed_objects
178            .iter()
179            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
180            .map(|obj| obj.oid);
181
182        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
183            .select_only()
184            .column(streaming_job::Column::JobId)
185            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
186            .into_tuple()
187            .all(&txn)
188            .await?;
189
190        // Check if there are any streaming jobs that are creating.
191        if !removed_streaming_job_ids.is_empty() {
192            let creating = StreamingJob::find()
193                .filter(
194                    streaming_job::Column::JobStatus
195                        .ne(JobStatus::Created)
196                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
197                )
198                .count(&txn)
199                .await?;
200            if creating != 0 {
201                return Err(MetaError::permission_denied(format!(
202                    "can not drop {creating} creating streaming job, please cancel them firstly"
203                )));
204            }
205        }
206
207        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
208
209        // Add associated sources.
210        let mut removed_source_ids: Vec<SourceId> = Table::find()
211            .select_only()
212            .column(table::Column::OptionalAssociatedSourceId)
213            .filter(
214                table::Column::TableId
215                    .is_in(removed_table_ids)
216                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
217            )
218            .into_tuple()
219            .all(&txn)
220            .await?;
221        let removed_source_objs: Vec<PartialObject> = Object::find()
222            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
223            .into_partial_model()
224            .all(&txn)
225            .await?;
226        removed_objects.extend(removed_source_objs);
227        if object_type == ObjectType::Source {
228            removed_source_ids.push(object_id);
229        }
230
231        let removed_secret_ids = removed_objects
232            .iter()
233            .filter(|obj| obj.obj_type == ObjectType::Secret)
234            .map(|obj| obj.oid)
235            .collect_vec();
236
237        if !removed_streaming_job_ids.is_empty() {
238            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
239                .select_only()
240                .columns([
241                    object::Column::Oid,
242                    object::Column::ObjType,
243                    object::Column::SchemaId,
244                    object::Column::DatabaseId,
245                ])
246                .join(JoinType::InnerJoin, object::Relation::Table.def())
247                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
248                .into_partial_model()
249                .all(&txn)
250                .await?;
251
252            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
253            removed_objects.extend(removed_internal_table_objs);
254        }
255
256        let removed_objects: HashMap<_, _> = removed_objects
257            .into_iter()
258            .map(|obj| (obj.oid, obj))
259            .collect();
260
261        // TODO: Support drop cascade for cross-database query.
262        for obj in removed_objects.values() {
263            if let Some(obj_database_id) = obj.database_id
264                && obj_database_id != database_id
265            {
266                return Err(MetaError::permission_denied(format!(
267                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
268                )));
269            }
270        }
271
272        let (removed_source_fragments, removed_actors, removed_fragments) =
273            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
274
275        // Find affect users with privileges on all this objects.
276        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
277            .select_only()
278            .distinct()
279            .column(user_privilege::Column::UserId)
280            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
281            .into_tuple()
282            .all(&txn)
283            .await?;
284        let dropped_tables = Table::find()
285            .find_also_related(Object)
286            .filter(
287                table::Column::TableId.is_in(
288                    removed_state_table_ids
289                        .iter()
290                        .copied()
291                        .collect::<HashSet<ObjectId>>(),
292                ),
293            )
294            .all(&txn)
295            .await?
296            .into_iter()
297            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
298        // delete all in to_drop_objects.
299        let res = Object::delete_many()
300            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
301            .exec(&txn)
302            .await?;
303        if res.rows_affected == 0 {
304            return Err(MetaError::catalog_id_not_found(
305                object_type.as_str(),
306                object_id,
307            ));
308        }
309        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
310
311        txn.commit().await?;
312
313        // notify about them.
314        self.notify_users_update(user_infos).await;
315        inner
316            .dropped_tables
317            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
318        let version = match object_type {
319            ObjectType::Database => {
320                // TODO: Notify objects in other databases when the cross-database query is supported.
321                self.notify_frontend(
322                    NotificationOperation::Delete,
323                    NotificationInfo::Database(PbDatabase {
324                        id: database_id as _,
325                        ..Default::default()
326                    }),
327                )
328                .await
329            }
330            ObjectType::Schema => {
331                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
332                    .into_values()
333                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
334                let schema_obj = schema_obj
335                    .into_iter()
336                    .exactly_one()
337                    .expect("schema object not found");
338                to_notify_objs.push(schema_obj);
339
340                let relation_group = build_object_group_for_delete(to_notify_objs);
341                self.notify_frontend(NotificationOperation::Delete, relation_group)
342                    .await
343            }
344            _ => {
345                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
346                // They only need RelationInfo::Table.
347                let relation_group =
348                    build_object_group_for_delete(removed_objects.into_values().collect());
349                self.notify_frontend(NotificationOperation::Delete, relation_group)
350                    .await
351            }
352        };
353
354        let fragment_mappings = removed_fragments
355            .iter()
356            .map(|fragment_id| PbFragmentWorkerSlotMapping {
357                fragment_id: *fragment_id as _,
358                mapping: None,
359            })
360            .collect();
361
362        self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
363            .await;
364
365        Ok((
366            ReleaseContext {
367                database_id,
368                removed_streaming_job_ids,
369                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
370                removed_source_ids,
371                removed_secret_ids,
372                removed_source_fragments,
373                removed_actors,
374                removed_fragments,
375            },
376            version,
377        ))
378    }
379
380    pub async fn try_abort_creating_subscription(
381        &self,
382        subscription_id: SubscriptionId,
383    ) -> MetaResult<()> {
384        let inner = self.inner.write().await;
385        let txn = inner.db.begin().await?;
386
387        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
388        let Some(subscription) = subscription else {
389            tracing::warn!(
390                subscription_id,
391                "subscription not found when aborting creation, might be cleaned by recovery"
392            );
393            return Ok(());
394        };
395
396        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
397            tracing::warn!(
398                subscription_id,
399                "subscription is already created when aborting creation"
400            );
401            return Ok(());
402        }
403
404        subscription.delete(&txn).await?;
405        Ok(())
406    }
407}
408
409async fn report_drop_object(
410    object_type: ObjectType,
411    object_id: ObjectId,
412    txn: &DatabaseTransaction,
413) {
414    let connector_name = {
415        match object_type {
416            ObjectType::Sink => Sink::find_by_id(object_id)
417                .select_only()
418                .column(sink::Column::Properties)
419                .into_tuple::<Property>()
420                .one(txn)
421                .await
422                .ok()
423                .flatten()
424                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
425            ObjectType::Source => Source::find_by_id(object_id)
426                .select_only()
427                .column(source::Column::WithProperties)
428                .into_tuple::<Property>()
429                .one(txn)
430                .await
431                .ok()
432                .flatten()
433                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
434            _ => unreachable!(),
435        }
436    };
437    if let Some(connector_name) = connector_name {
438        report_event(
439            PbTelemetryEventStage::DropStreamJob,
440            "source",
441            object_id.into(),
442            Some(connector_name),
443            Some(match object_type {
444                ObjectType::Source => PbTelemetryDatabaseObject::Source,
445                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
446                _ => unreachable!(),
447            }),
448            None,
449        );
450    }
451}