risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
17use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter};
18
19use super::*;
20impl CatalogController {
21    // Drop all kinds of objects including databases,
22    // schemas, relations, connections, functions, etc.
23    pub async fn drop_object(
24        &self,
25        object_type: ObjectType,
26        object_id: ObjectId,
27        drop_mode: DropMode,
28    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
29        let mut inner = self.inner.write().await;
30        let txn = inner.db.begin().await?;
31
32        let obj: PartialObject = Object::find_by_id(object_id)
33            .into_partial_model()
34            .one(&txn)
35            .await?
36            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
37        assert_eq!(obj.obj_type, object_type);
38        let drop_database = object_type == ObjectType::Database;
39        let database_id = if object_type == ObjectType::Database {
40            object_id
41        } else {
42            obj.database_id
43                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
44        };
45
46        // Check the cross-db dependency info to see if the subscription can be dropped.
47        if obj.obj_type == ObjectType::Subscription {
48            validate_subscription_deletion(&txn, object_id).await?;
49        }
50
51        let mut removed_objects = match drop_mode {
52            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
53            DropMode::Restrict => match object_type {
54                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
55                ObjectType::Schema => {
56                    ensure_schema_empty(object_id, &txn).await?;
57                    Default::default()
58                }
59                ObjectType::Table => {
60                    ensure_object_not_refer(object_type, object_id, &txn).await?;
61                    let indexes = get_referring_objects(object_id, &txn).await?;
62                    for obj in indexes.iter().filter(|object| {
63                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
64                    }) {
65                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
66                    }
67                    assert!(
68                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
69                        "only index could be dropped in restrict mode"
70                    );
71                    indexes
72                }
73                object_type @ (ObjectType::Source | ObjectType::Sink) => {
74                    ensure_object_not_refer(object_type, object_id, &txn).await?;
75                    report_drop_object(object_type, object_id, &txn).await;
76                    vec![]
77                }
78
79                ObjectType::View
80                | ObjectType::Index
81                | ObjectType::Function
82                | ObjectType::Connection
83                | ObjectType::Subscription
84                | ObjectType::Secret => {
85                    ensure_object_not_refer(object_type, object_id, &txn).await?;
86                    vec![]
87                }
88            },
89        };
90        removed_objects.push(obj);
91        let mut removed_object_ids: HashSet<_> =
92            removed_objects.iter().map(|obj| obj.oid).collect();
93
94        // TODO: record dependency info in object_dependency table for sink into table.
95        // Special handling for 'sink into table'.
96        let removed_incoming_sinks: Vec<I32Array> = Table::find()
97            .select_only()
98            .column(table::Column::IncomingSinks)
99            .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
100            .into_tuple()
101            .all(&txn)
102            .await?;
103        if !removed_incoming_sinks.is_empty() {
104            let removed_sink_objs: Vec<PartialObject> = Object::find()
105                .filter(
106                    object::Column::Oid.is_in(
107                        removed_incoming_sinks
108                            .into_iter()
109                            .flat_map(|arr| arr.into_inner().into_iter()),
110                    ),
111                )
112                .into_partial_model()
113                .all(&txn)
114                .await?;
115
116            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
117            removed_objects.extend(removed_sink_objs);
118        }
119
120        // When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink.
121        if object_type != ObjectType::Sink {
122            for obj in &removed_objects {
123                if obj.obj_type == ObjectType::Sink {
124                    let sink = Sink::find_by_id(obj.oid)
125                        .one(&txn)
126                        .await?
127                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
128
129                    // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping.
130                    if let Some(target_table) = sink.target_table
131                        && !removed_object_ids.contains(&target_table)
132                    {
133                        return Err(MetaError::permission_denied(format!(
134                            "Found sink into table in dependency: {}, please drop it manually",
135                            sink.name,
136                        )));
137                    }
138                }
139            }
140        }
141
142        // 1. Detect when an Iceberg table is part of the dependencies.
143        // 2. Drop database with iceberg tables in it is not supported.
144        if object_type != ObjectType::Table || drop_database {
145            for obj in &removed_objects {
146                // if the obj is iceberg engine table, bail out
147                if obj.obj_type == ObjectType::Table {
148                    let table = Table::find_by_id(obj.oid)
149                        .one(&txn)
150                        .await?
151                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
152                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
153                        return Err(MetaError::permission_denied(format!(
154                            "Found iceberg table in dependency: {}, please drop it manually",
155                            table.name,
156                        )));
157                    }
158                }
159            }
160        }
161
162        let removed_table_ids = removed_objects
163            .iter()
164            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
165            .map(|obj| obj.oid);
166
167        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
168            .select_only()
169            .column(streaming_job::Column::JobId)
170            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
171            .into_tuple()
172            .all(&txn)
173            .await?;
174
175        // Check if there are any streaming jobs that are creating.
176        if !removed_streaming_job_ids.is_empty() {
177            let creating = StreamingJob::find()
178                .filter(
179                    streaming_job::Column::JobStatus
180                        .ne(JobStatus::Created)
181                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
182                )
183                .count(&txn)
184                .await?;
185            if creating != 0 {
186                return Err(MetaError::permission_denied(format!(
187                    "can not drop {creating} creating streaming job, please cancel them firstly"
188                )));
189            }
190        }
191
192        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
193
194        // Add associated sources.
195        let mut removed_source_ids: Vec<SourceId> = Table::find()
196            .select_only()
197            .column(table::Column::OptionalAssociatedSourceId)
198            .filter(
199                table::Column::TableId
200                    .is_in(removed_table_ids)
201                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
202            )
203            .into_tuple()
204            .all(&txn)
205            .await?;
206        let removed_source_objs: Vec<PartialObject> = Object::find()
207            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
208            .into_partial_model()
209            .all(&txn)
210            .await?;
211        removed_objects.extend(removed_source_objs);
212        if object_type == ObjectType::Source {
213            removed_source_ids.push(object_id);
214        }
215
216        let removed_secret_ids = removed_objects
217            .iter()
218            .filter(|obj| obj.obj_type == ObjectType::Secret)
219            .map(|obj| obj.oid)
220            .collect_vec();
221
222        if !removed_streaming_job_ids.is_empty() {
223            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
224                .select_only()
225                .columns([
226                    object::Column::Oid,
227                    object::Column::ObjType,
228                    object::Column::SchemaId,
229                    object::Column::DatabaseId,
230                ])
231                .join(JoinType::InnerJoin, object::Relation::Table.def())
232                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
233                .into_partial_model()
234                .all(&txn)
235                .await?;
236
237            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
238            removed_objects.extend(removed_internal_table_objs);
239        }
240
241        let removed_objects: HashMap<_, _> = removed_objects
242            .into_iter()
243            .map(|obj| (obj.oid, obj))
244            .collect();
245
246        // TODO: Support drop cascade for cross-database query.
247        for obj in removed_objects.values() {
248            if let Some(obj_database_id) = obj.database_id {
249                if obj_database_id != database_id {
250                    return Err(MetaError::permission_denied(format!(
251                        "Referenced by other objects in database {obj_database_id}, please drop them manually"
252                    )));
253                }
254            }
255        }
256
257        let (removed_source_fragments, removed_actors, removed_fragments) =
258            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
259
260        // Find affect users with privileges on all this objects.
261        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
262            .select_only()
263            .distinct()
264            .column(user_privilege::Column::UserId)
265            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
266            .into_tuple()
267            .all(&txn)
268            .await?;
269        let dropped_tables = Table::find()
270            .find_also_related(Object)
271            .filter(
272                table::Column::TableId.is_in(
273                    removed_state_table_ids
274                        .iter()
275                        .copied()
276                        .collect::<HashSet<ObjectId>>(),
277                ),
278            )
279            .all(&txn)
280            .await?
281            .into_iter()
282            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
283        // delete all in to_drop_objects.
284        let res = Object::delete_many()
285            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
286            .exec(&txn)
287            .await?;
288        if res.rows_affected == 0 {
289            return Err(MetaError::catalog_id_not_found(
290                object_type.as_str(),
291                object_id,
292            ));
293        }
294        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
295
296        txn.commit().await?;
297
298        // notify about them.
299        self.notify_users_update(user_infos).await;
300        inner
301            .dropped_tables
302            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
303        let version = match object_type {
304            ObjectType::Database => {
305                // TODO: Notify objects in other databases when the cross-database query is supported.
306                self.notify_frontend(
307                    NotificationOperation::Delete,
308                    NotificationInfo::Database(PbDatabase {
309                        id: database_id as _,
310                        ..Default::default()
311                    }),
312                )
313                .await
314            }
315            ObjectType::Schema => {
316                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
317                    .into_values()
318                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
319                let schema_obj = schema_obj
320                    .into_iter()
321                    .exactly_one()
322                    .expect("schema object not found");
323                to_notify_objs.push(schema_obj);
324
325                let relation_group = build_object_group_for_delete(to_notify_objs);
326                self.notify_frontend(NotificationOperation::Delete, relation_group)
327                    .await
328            }
329            _ => {
330                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
331                // They only need RelationInfo::Table.
332                let relation_group =
333                    build_object_group_for_delete(removed_objects.into_values().collect());
334                self.notify_frontend(NotificationOperation::Delete, relation_group)
335                    .await
336            }
337        };
338
339        let fragment_mappings = removed_fragments
340            .iter()
341            .map(|fragment_id| PbFragmentWorkerSlotMapping {
342                fragment_id: *fragment_id as _,
343                mapping: None,
344            })
345            .collect();
346
347        self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
348            .await;
349
350        Ok((
351            ReleaseContext {
352                database_id,
353                removed_streaming_job_ids,
354                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
355                removed_source_ids,
356                removed_secret_ids,
357                removed_source_fragments,
358                removed_actors,
359                removed_fragments,
360            },
361            version,
362        ))
363    }
364}
365
366async fn report_drop_object(
367    object_type: ObjectType,
368    object_id: ObjectId,
369    txn: &DatabaseTransaction,
370) {
371    let connector_name = {
372        match object_type {
373            ObjectType::Sink => Sink::find_by_id(object_id)
374                .one(txn)
375                .await
376                .ok()
377                .flatten()
378                .and_then(|sink| sink.properties.inner_ref().get("connector").cloned()),
379            ObjectType::Source => Source::find_by_id(object_id)
380                .one(txn)
381                .await
382                .ok()
383                .flatten()
384                .and_then(|source| source.with_properties.inner_ref().get("connector").cloned()),
385            _ => unreachable!(),
386        }
387    };
388    if let Some(connector_name) = connector_name {
389        report_event(
390            PbTelemetryEventStage::DropStreamJob,
391            "source",
392            object_id.into(),
393            Some(connector_name),
394            Some(match object_type {
395                ObjectType::Source => PbTelemetryDatabaseObject::Source,
396                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
397                _ => unreachable!(),
398            }),
399            None,
400        );
401    }
402}