risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22    // Drop all kinds of objects including databases,
23    // schemas, relations, connections, functions, etc.
24    pub async fn drop_object(
25        &self,
26        object_type: ObjectType,
27        object_id: ObjectId,
28        drop_mode: DropMode,
29    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30        let mut inner = self.inner.write().await;
31        let txn = inner.db.begin().await?;
32
33        let obj: PartialObject = Object::find_by_id(object_id)
34            .into_partial_model()
35            .one(&txn)
36            .await?
37            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38        assert_eq!(obj.obj_type, object_type);
39        let drop_database = object_type == ObjectType::Database;
40        let database_id = if object_type == ObjectType::Database {
41            object_id
42        } else {
43            obj.database_id
44                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45        };
46
47        // Check the cross-db dependency info to see if the subscription can be dropped.
48        if obj.obj_type == ObjectType::Subscription {
49            validate_subscription_deletion(&txn, object_id).await?;
50        }
51
52        let mut removed_objects = match drop_mode {
53            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54            DropMode::Restrict => match object_type {
55                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56                ObjectType::Schema => {
57                    ensure_schema_empty(object_id, &txn).await?;
58                    Default::default()
59                }
60                ObjectType::Table => {
61                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
62                    let indexes = get_referring_objects(object_id, &txn).await?;
63                    for obj in indexes.iter().filter(|object| {
64                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65                    }) {
66                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
67                    }
68                    assert!(
69                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70                        "only index could be dropped in restrict mode"
71                    );
72                    for idx in &indexes {
73                        check_object_refer_for_drop(idx.obj_type, idx.oid, &txn).await?;
74                    }
75                    indexes
76                }
77                object_type @ (ObjectType::Source | ObjectType::Sink) => {
78                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
79                    report_drop_object(object_type, object_id, &txn).await;
80                    vec![]
81                }
82
83                ObjectType::View
84                | ObjectType::Index
85                | ObjectType::Function
86                | ObjectType::Connection
87                | ObjectType::Subscription
88                | ObjectType::Secret => {
89                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
90                    vec![]
91                }
92            },
93        };
94        removed_objects.push(obj);
95        let mut removed_object_ids: HashSet<_> =
96            removed_objects.iter().map(|obj| obj.oid).collect();
97
98        // TODO: record dependency info in object_dependency table for sink into table.
99        // Special handling for 'sink into table'.
100        let incoming_sink_ids: Vec<SinkId> = Sink::find()
101            .select_only()
102            .column(sink::Column::SinkId)
103            .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
104            .into_tuple()
105            .all(&txn)
106            .await?;
107        if !incoming_sink_ids.is_empty() {
108            if self.env.opts.protect_drop_table_with_incoming_sink {
109                let sink_names: Vec<String> = Sink::find()
110                    .select_only()
111                    .column(sink::Column::Name)
112                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
113                    .into_tuple()
114                    .all(&txn)
115                    .await?;
116
117                return Err(MetaError::permission_denied(format!(
118                    "Table used by incoming sinks: {:?}, please drop them manually",
119                    sink_names
120                )));
121            }
122
123            let removed_sink_objs: Vec<PartialObject> = Object::find()
124                .filter(object::Column::Oid.is_in(incoming_sink_ids))
125                .into_partial_model()
126                .all(&txn)
127                .await?;
128
129            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
130            removed_objects.extend(removed_sink_objs);
131        }
132
133        for obj in &removed_objects {
134            if obj.obj_type == ObjectType::Sink {
135                let sink = Sink::find_by_id(obj.oid)
136                    .one(&txn)
137                    .await?
138                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
139
140                if let Some(target_table) = sink.target_table
141                    && !removed_object_ids.contains(&target_table)
142                    && !has_table_been_migrated(&txn, target_table).await?
143                {
144                    return Err(anyhow::anyhow!(
145                        "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
146                        target_table
147                    ).into());
148                }
149            }
150        }
151
152        // 1. Detect when an Iceberg table is part of the dependencies.
153        // 2. Drop database with iceberg tables in it is not supported.
154        if object_type != ObjectType::Table || drop_database {
155            for obj in &removed_objects {
156                // if the obj is iceberg engine table, bail out
157                if obj.obj_type == ObjectType::Table {
158                    let table = Table::find_by_id(obj.oid)
159                        .one(&txn)
160                        .await?
161                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
162                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
163                        return Err(MetaError::permission_denied(format!(
164                            "Found iceberg table in dependency: {}, please drop it manually",
165                            table.name,
166                        )));
167                    }
168                }
169            }
170        }
171
172        let removed_table_ids = removed_objects
173            .iter()
174            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
175            .map(|obj| obj.oid);
176
177        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
178            .select_only()
179            .column(streaming_job::Column::JobId)
180            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
181            .into_tuple()
182            .all(&txn)
183            .await?;
184
185        // Check if there are any streaming jobs that are creating.
186        if !removed_streaming_job_ids.is_empty() {
187            let creating = StreamingJob::find()
188                .filter(
189                    streaming_job::Column::JobStatus
190                        .ne(JobStatus::Created)
191                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
192                )
193                .count(&txn)
194                .await?;
195            if creating != 0 {
196                if creating == 1 && object_type == ObjectType::Sink {
197                    info!("dropping creating sink job, it will be cancelled");
198                } else {
199                    return Err(MetaError::permission_denied(format!(
200                        "can not drop {creating} creating streaming job, please cancel them firstly"
201                    )));
202                }
203            }
204        }
205
206        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
207
208        // Add associated sources.
209        let mut removed_source_ids: Vec<SourceId> = Table::find()
210            .select_only()
211            .column(table::Column::OptionalAssociatedSourceId)
212            .filter(
213                table::Column::TableId
214                    .is_in(removed_table_ids)
215                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
216            )
217            .into_tuple()
218            .all(&txn)
219            .await?;
220        let removed_source_objs: Vec<PartialObject> = Object::find()
221            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
222            .into_partial_model()
223            .all(&txn)
224            .await?;
225        removed_objects.extend(removed_source_objs);
226        if object_type == ObjectType::Source {
227            removed_source_ids.push(object_id);
228        }
229
230        let removed_secret_ids = removed_objects
231            .iter()
232            .filter(|obj| obj.obj_type == ObjectType::Secret)
233            .map(|obj| obj.oid)
234            .collect_vec();
235
236        if !removed_streaming_job_ids.is_empty() {
237            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
238                .select_only()
239                .columns([
240                    object::Column::Oid,
241                    object::Column::ObjType,
242                    object::Column::SchemaId,
243                    object::Column::DatabaseId,
244                ])
245                .join(JoinType::InnerJoin, object::Relation::Table.def())
246                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
247                .into_partial_model()
248                .all(&txn)
249                .await?;
250
251            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
252            removed_objects.extend(removed_internal_table_objs);
253        }
254
255        let removed_objects: HashMap<_, _> = removed_objects
256            .into_iter()
257            .map(|obj| (obj.oid, obj))
258            .collect();
259
260        // TODO: Support drop cascade for cross-database query.
261        for obj in removed_objects.values() {
262            if let Some(obj_database_id) = obj.database_id
263                && obj_database_id != database_id
264            {
265                return Err(MetaError::permission_denied(format!(
266                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
267                )));
268            }
269        }
270
271        let (removed_source_fragments, removed_sink_fragments, removed_actors, removed_fragments) =
272            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
273
274        let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
275        let mut removed_sink_fragment_by_targets = HashMap::new();
276        for (sink_fragment, target_fragments) in sink_target_fragments {
277            assert!(
278                target_fragments.len() <= 1,
279                "sink should have at most one downstream fragment"
280            );
281            if let Some(target_fragment) = target_fragments.first()
282                && !removed_fragments.contains(target_fragment)
283            {
284                removed_sink_fragment_by_targets
285                    .entry(*target_fragment)
286                    .or_insert_with(Vec::new)
287                    .push(sink_fragment);
288            }
289        }
290
291        // Find affect users with privileges on all this objects.
292        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
293            .select_only()
294            .distinct()
295            .column(user_privilege::Column::UserId)
296            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
297            .into_tuple()
298            .all(&txn)
299            .await?;
300        let dropped_tables = Table::find()
301            .find_also_related(Object)
302            .filter(
303                table::Column::TableId.is_in(
304                    removed_state_table_ids
305                        .iter()
306                        .copied()
307                        .collect::<HashSet<ObjectId>>(),
308                ),
309            )
310            .all(&txn)
311            .await?
312            .into_iter()
313            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
314        // delete all in to_drop_objects.
315        let res = Object::delete_many()
316            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
317            .exec(&txn)
318            .await?;
319        if res.rows_affected == 0 {
320            return Err(MetaError::catalog_id_not_found(
321                object_type.as_str(),
322                object_id,
323            ));
324        }
325        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
326
327        txn.commit().await?;
328
329        // notify about them.
330        self.notify_users_update(user_infos).await;
331        inner
332            .dropped_tables
333            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
334
335        let version = match object_type {
336            ObjectType::Database => {
337                // TODO: Notify objects in other databases when the cross-database query is supported.
338                self.notify_frontend(
339                    NotificationOperation::Delete,
340                    NotificationInfo::Database(PbDatabase {
341                        id: database_id as _,
342                        ..Default::default()
343                    }),
344                )
345                .await
346            }
347            ObjectType::Schema => {
348                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
349                    .into_values()
350                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
351                let schema_obj = schema_obj
352                    .into_iter()
353                    .exactly_one()
354                    .expect("schema object not found");
355                to_notify_objs.push(schema_obj);
356
357                let relation_group = build_object_group_for_delete(to_notify_objs);
358                self.notify_frontend(NotificationOperation::Delete, relation_group)
359                    .await
360            }
361            _ => {
362                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
363                // They only need RelationInfo::Table.
364                let relation_group =
365                    build_object_group_for_delete(removed_objects.into_values().collect());
366                self.notify_frontend(NotificationOperation::Delete, relation_group)
367                    .await
368            }
369        };
370
371        Ok((
372            ReleaseContext {
373                database_id,
374                removed_streaming_job_ids,
375                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
376                removed_source_ids,
377                removed_secret_ids,
378                removed_source_fragments,
379                removed_actors,
380                removed_fragments,
381                removed_sink_fragment_by_targets,
382            },
383            version,
384        ))
385    }
386
387    pub async fn try_abort_creating_subscription(
388        &self,
389        subscription_id: SubscriptionId,
390    ) -> MetaResult<()> {
391        let inner = self.inner.write().await;
392        let txn = inner.db.begin().await?;
393
394        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
395        let Some(subscription) = subscription else {
396            tracing::warn!(
397                subscription_id,
398                "subscription not found when aborting creation, might be cleaned by recovery"
399            );
400            return Ok(());
401        };
402
403        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
404            tracing::warn!(
405                subscription_id,
406                "subscription is already created when aborting creation"
407            );
408            return Ok(());
409        }
410
411        subscription.delete(&txn).await?;
412        Ok(())
413    }
414}
415
416async fn report_drop_object(
417    object_type: ObjectType,
418    object_id: ObjectId,
419    txn: &DatabaseTransaction,
420) {
421    let connector_name = {
422        match object_type {
423            ObjectType::Sink => Sink::find_by_id(object_id)
424                .select_only()
425                .column(sink::Column::Properties)
426                .into_tuple::<Property>()
427                .one(txn)
428                .await
429                .ok()
430                .flatten()
431                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
432            ObjectType::Source => Source::find_by_id(object_id)
433                .select_only()
434                .column(source::Column::WithProperties)
435                .into_tuple::<Property>()
436                .one(txn)
437                .await
438                .ok()
439                .flatten()
440                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
441            _ => unreachable!(),
442        }
443    };
444    if let Some(connector_name) = connector_name {
445        report_event(
446            PbTelemetryEventStage::DropStreamJob,
447            "source",
448            object_id.into(),
449            Some(connector_name),
450            Some(match object_type {
451                ObjectType::Source => PbTelemetryDatabaseObject::Source,
452                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
453                _ => unreachable!(),
454            }),
455            None,
456        );
457    }
458}