risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22    // Drop all kinds of objects including databases,
23    // schemas, relations, connections, functions, etc.
24    pub async fn drop_object(
25        &self,
26        object_type: ObjectType,
27        object_id: ObjectId,
28        drop_mode: DropMode,
29    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30        let mut inner = self.inner.write().await;
31        let txn = inner.db.begin().await?;
32
33        let obj: PartialObject = Object::find_by_id(object_id)
34            .into_partial_model()
35            .one(&txn)
36            .await?
37            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38        assert_eq!(obj.obj_type, object_type);
39        let drop_database = object_type == ObjectType::Database;
40        let database_id = if object_type == ObjectType::Database {
41            object_id
42        } else {
43            obj.database_id
44                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45        };
46
47        // Check the cross-db dependency info to see if the subscription can be dropped.
48        if obj.obj_type == ObjectType::Subscription {
49            validate_subscription_deletion(&txn, object_id).await?;
50        }
51
52        let mut removed_objects = match drop_mode {
53            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54            DropMode::Restrict => match object_type {
55                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56                ObjectType::Schema => {
57                    ensure_schema_empty(object_id, &txn).await?;
58                    Default::default()
59                }
60                ObjectType::Table => {
61                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
62                    let indexes = get_referring_objects(object_id, &txn).await?;
63                    for obj in indexes.iter().filter(|object| {
64                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65                    }) {
66                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
67                    }
68                    assert!(
69                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70                        "only index could be dropped in restrict mode"
71                    );
72                    indexes
73                }
74                object_type @ (ObjectType::Source | ObjectType::Sink) => {
75                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
76                    report_drop_object(object_type, object_id, &txn).await;
77                    vec![]
78                }
79
80                ObjectType::View
81                | ObjectType::Index
82                | ObjectType::Function
83                | ObjectType::Connection
84                | ObjectType::Subscription
85                | ObjectType::Secret => {
86                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
87                    vec![]
88                }
89            },
90        };
91        removed_objects.push(obj);
92        let mut removed_object_ids: HashSet<_> =
93            removed_objects.iter().map(|obj| obj.oid).collect();
94
95        // TODO: record dependency info in object_dependency table for sink into table.
96        // Special handling for 'sink into table'.
97        let removed_incoming_sinks: Vec<I32Array> = Table::find()
98            .select_only()
99            .column(table::Column::IncomingSinks)
100            .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
101            .into_tuple()
102            .all(&txn)
103            .await?;
104        if !removed_incoming_sinks.is_empty() {
105            let incoming_sink_ids = removed_incoming_sinks
106                .into_iter()
107                .flat_map(|arr| arr.into_inner().into_iter())
108                .collect_vec();
109
110            if self.env.opts.protect_drop_table_with_incoming_sink {
111                let sink_names: Vec<String> = Sink::find()
112                    .select_only()
113                    .column(sink::Column::Name)
114                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
115                    .into_tuple()
116                    .all(&txn)
117                    .await?;
118
119                return Err(MetaError::permission_denied(format!(
120                    "Table used by incoming sinks: {:?}, please drop them manually",
121                    sink_names
122                )));
123            }
124
125            let removed_sink_objs: Vec<PartialObject> = Object::find()
126                .filter(object::Column::Oid.is_in(incoming_sink_ids))
127                .into_partial_model()
128                .all(&txn)
129                .await?;
130
131            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
132            removed_objects.extend(removed_sink_objs);
133        }
134
135        // When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink.
136        if object_type != ObjectType::Sink {
137            for obj in &removed_objects {
138                if obj.obj_type == ObjectType::Sink {
139                    let sink = Sink::find_by_id(obj.oid)
140                        .one(&txn)
141                        .await?
142                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
143
144                    // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping.
145                    if let Some(target_table) = sink.target_table
146                        && !removed_object_ids.contains(&target_table)
147                    {
148                        return Err(MetaError::permission_denied(format!(
149                            "Found sink into table in dependency: {}, please drop it manually",
150                            sink.name,
151                        )));
152                    }
153                }
154            }
155        }
156
157        // 1. Detect when an Iceberg table is part of the dependencies.
158        // 2. Drop database with iceberg tables in it is not supported.
159        if object_type != ObjectType::Table || drop_database {
160            for obj in &removed_objects {
161                // if the obj is iceberg engine table, bail out
162                if obj.obj_type == ObjectType::Table {
163                    let table = Table::find_by_id(obj.oid)
164                        .one(&txn)
165                        .await?
166                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
167                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
168                        return Err(MetaError::permission_denied(format!(
169                            "Found iceberg table in dependency: {}, please drop it manually",
170                            table.name,
171                        )));
172                    }
173                }
174            }
175        }
176
177        let removed_table_ids = removed_objects
178            .iter()
179            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
180            .map(|obj| obj.oid);
181
182        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
183            .select_only()
184            .column(streaming_job::Column::JobId)
185            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
186            .into_tuple()
187            .all(&txn)
188            .await?;
189
190        // Check if there are any streaming jobs that are creating.
191        if !removed_streaming_job_ids.is_empty() {
192            let creating = StreamingJob::find()
193                .filter(
194                    streaming_job::Column::JobStatus
195                        .ne(JobStatus::Created)
196                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
197                )
198                .count(&txn)
199                .await?;
200            if creating != 0 {
201                if creating == 1 && object_type == ObjectType::Sink {
202                    info!("dropping creating sink job, it will be cancelled");
203                } else {
204                    return Err(MetaError::permission_denied(format!(
205                        "can not drop {creating} creating streaming job, please cancel them firstly"
206                    )));
207                }
208            }
209        }
210
211        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
212
213        // Add associated sources.
214        let mut removed_source_ids: Vec<SourceId> = Table::find()
215            .select_only()
216            .column(table::Column::OptionalAssociatedSourceId)
217            .filter(
218                table::Column::TableId
219                    .is_in(removed_table_ids)
220                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
221            )
222            .into_tuple()
223            .all(&txn)
224            .await?;
225        let removed_source_objs: Vec<PartialObject> = Object::find()
226            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
227            .into_partial_model()
228            .all(&txn)
229            .await?;
230        removed_objects.extend(removed_source_objs);
231        if object_type == ObjectType::Source {
232            removed_source_ids.push(object_id);
233        }
234
235        let removed_secret_ids = removed_objects
236            .iter()
237            .filter(|obj| obj.obj_type == ObjectType::Secret)
238            .map(|obj| obj.oid)
239            .collect_vec();
240
241        if !removed_streaming_job_ids.is_empty() {
242            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
243                .select_only()
244                .columns([
245                    object::Column::Oid,
246                    object::Column::ObjType,
247                    object::Column::SchemaId,
248                    object::Column::DatabaseId,
249                ])
250                .join(JoinType::InnerJoin, object::Relation::Table.def())
251                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
252                .into_partial_model()
253                .all(&txn)
254                .await?;
255
256            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
257            removed_objects.extend(removed_internal_table_objs);
258        }
259
260        let removed_objects: HashMap<_, _> = removed_objects
261            .into_iter()
262            .map(|obj| (obj.oid, obj))
263            .collect();
264
265        // TODO: Support drop cascade for cross-database query.
266        for obj in removed_objects.values() {
267            if let Some(obj_database_id) = obj.database_id
268                && obj_database_id != database_id
269            {
270                return Err(MetaError::permission_denied(format!(
271                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
272                )));
273            }
274        }
275
276        let (removed_source_fragments, removed_actors, removed_fragments) =
277            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
278
279        // Find affect users with privileges on all this objects.
280        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
281            .select_only()
282            .distinct()
283            .column(user_privilege::Column::UserId)
284            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
285            .into_tuple()
286            .all(&txn)
287            .await?;
288        let dropped_tables = Table::find()
289            .find_also_related(Object)
290            .filter(
291                table::Column::TableId.is_in(
292                    removed_state_table_ids
293                        .iter()
294                        .copied()
295                        .collect::<HashSet<ObjectId>>(),
296                ),
297            )
298            .all(&txn)
299            .await?
300            .into_iter()
301            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
302        // delete all in to_drop_objects.
303        let res = Object::delete_many()
304            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
305            .exec(&txn)
306            .await?;
307        if res.rows_affected == 0 {
308            return Err(MetaError::catalog_id_not_found(
309                object_type.as_str(),
310                object_id,
311            ));
312        }
313        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
314
315        txn.commit().await?;
316
317        // notify about them.
318        self.notify_users_update(user_infos).await;
319        inner
320            .dropped_tables
321            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
322        let version = match object_type {
323            ObjectType::Database => {
324                // TODO: Notify objects in other databases when the cross-database query is supported.
325                self.notify_frontend(
326                    NotificationOperation::Delete,
327                    NotificationInfo::Database(PbDatabase {
328                        id: database_id as _,
329                        ..Default::default()
330                    }),
331                )
332                .await
333            }
334            ObjectType::Schema => {
335                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
336                    .into_values()
337                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
338                let schema_obj = schema_obj
339                    .into_iter()
340                    .exactly_one()
341                    .expect("schema object not found");
342                to_notify_objs.push(schema_obj);
343
344                let relation_group = build_object_group_for_delete(to_notify_objs);
345                self.notify_frontend(NotificationOperation::Delete, relation_group)
346                    .await
347            }
348            _ => {
349                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
350                // They only need RelationInfo::Table.
351                let relation_group =
352                    build_object_group_for_delete(removed_objects.into_values().collect());
353                self.notify_frontend(NotificationOperation::Delete, relation_group)
354                    .await
355            }
356        };
357
358        Ok((
359            ReleaseContext {
360                database_id,
361                removed_streaming_job_ids,
362                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
363                removed_source_ids,
364                removed_secret_ids,
365                removed_source_fragments,
366                removed_actors,
367                removed_fragments,
368            },
369            version,
370        ))
371    }
372
373    pub async fn try_abort_creating_subscription(
374        &self,
375        subscription_id: SubscriptionId,
376    ) -> MetaResult<()> {
377        let inner = self.inner.write().await;
378        let txn = inner.db.begin().await?;
379
380        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
381        let Some(subscription) = subscription else {
382            tracing::warn!(
383                subscription_id,
384                "subscription not found when aborting creation, might be cleaned by recovery"
385            );
386            return Ok(());
387        };
388
389        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
390            tracing::warn!(
391                subscription_id,
392                "subscription is already created when aborting creation"
393            );
394            return Ok(());
395        }
396
397        subscription.delete(&txn).await?;
398        Ok(())
399    }
400}
401
402async fn report_drop_object(
403    object_type: ObjectType,
404    object_id: ObjectId,
405    txn: &DatabaseTransaction,
406) {
407    let connector_name = {
408        match object_type {
409            ObjectType::Sink => Sink::find_by_id(object_id)
410                .select_only()
411                .column(sink::Column::Properties)
412                .into_tuple::<Property>()
413                .one(txn)
414                .await
415                .ok()
416                .flatten()
417                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
418            ObjectType::Source => Source::find_by_id(object_id)
419                .select_only()
420                .column(source::Column::WithProperties)
421                .into_tuple::<Property>()
422                .one(txn)
423                .await
424                .ok()
425                .flatten()
426                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
427            _ => unreachable!(),
428        }
429    };
430    if let Some(connector_name) = connector_name {
431        report_event(
432            PbTelemetryEventStage::DropStreamJob,
433            "source",
434            object_id.into(),
435            Some(connector_name),
436            Some(match object_type {
437                ObjectType::Source => PbTelemetryDatabaseObject::Source,
438                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
439                _ => unreachable!(),
440            }),
441            None,
442        );
443    }
444}