risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::catalog::subscription::PbSubscriptionState;
17use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
18use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
19
20use super::*;
21impl CatalogController {
22    // Drop all kinds of objects including databases,
23    // schemas, relations, connections, functions, etc.
24    pub async fn drop_object(
25        &self,
26        object_type: ObjectType,
27        object_id: ObjectId,
28        drop_mode: DropMode,
29    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
30        let mut inner = self.inner.write().await;
31        let txn = inner.db.begin().await?;
32
33        let obj: PartialObject = Object::find_by_id(object_id)
34            .into_partial_model()
35            .one(&txn)
36            .await?
37            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
38        assert_eq!(obj.obj_type, object_type);
39        let drop_database = object_type == ObjectType::Database;
40        let database_id = if object_type == ObjectType::Database {
41            object_id
42        } else {
43            obj.database_id
44                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
45        };
46
47        // Check the cross-db dependency info to see if the subscription can be dropped.
48        if obj.obj_type == ObjectType::Subscription {
49            validate_subscription_deletion(&txn, object_id).await?;
50        }
51
52        let mut removed_objects = match drop_mode {
53            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
54            DropMode::Restrict => match object_type {
55                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
56                ObjectType::Schema => {
57                    ensure_schema_empty(object_id, &txn).await?;
58                    Default::default()
59                }
60                ObjectType::Table => {
61                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
62                    let indexes = get_referring_objects(object_id, &txn).await?;
63                    for obj in indexes.iter().filter(|object| {
64                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
65                    }) {
66                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
67                    }
68                    assert!(
69                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
70                        "only index could be dropped in restrict mode"
71                    );
72                    for idx in &indexes {
73                        check_object_refer_for_drop(idx.obj_type, idx.oid, &txn).await?;
74                    }
75                    indexes
76                }
77                object_type @ (ObjectType::Source | ObjectType::Sink) => {
78                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
79                    report_drop_object(object_type, object_id, &txn).await;
80                    vec![]
81                }
82
83                ObjectType::View
84                | ObjectType::Index
85                | ObjectType::Function
86                | ObjectType::Connection
87                | ObjectType::Subscription
88                | ObjectType::Secret => {
89                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
90                    vec![]
91                }
92            },
93        };
94        removed_objects.push(obj);
95        let mut removed_object_ids: HashSet<_> =
96            removed_objects.iter().map(|obj| obj.oid).collect();
97
98        // TODO: record dependency info in object_dependency table for sink into table.
99        // Special handling for 'sink into table'.
100        let incoming_sink_ids: Vec<SinkId> = Sink::find()
101            .select_only()
102            .column(sink::Column::SinkId)
103            .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
104            .into_tuple()
105            .all(&txn)
106            .await?;
107        if !incoming_sink_ids.is_empty() {
108            if self.env.opts.protect_drop_table_with_incoming_sink {
109                let sink_names: Vec<String> = Sink::find()
110                    .select_only()
111                    .column(sink::Column::Name)
112                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
113                    .into_tuple()
114                    .all(&txn)
115                    .await?;
116
117                return Err(MetaError::permission_denied(format!(
118                    "Table used by incoming sinks: {:?}, please drop them manually",
119                    sink_names
120                )));
121            }
122
123            let removed_sink_objs: Vec<PartialObject> = Object::find()
124                .filter(object::Column::Oid.is_in(incoming_sink_ids))
125                .into_partial_model()
126                .all(&txn)
127                .await?;
128
129            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
130            removed_objects.extend(removed_sink_objs);
131        }
132
133        for obj in &removed_objects {
134            if obj.obj_type == ObjectType::Sink {
135                let sink = Sink::find_by_id(obj.oid)
136                    .one(&txn)
137                    .await?
138                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
139
140                if let Some(target_table) = sink.target_table
141                    && !removed_object_ids.contains(&target_table)
142                    && !has_table_been_migrated(&txn, target_table).await?
143                {
144                    return Err(anyhow::anyhow!(
145                        "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
146                        target_table
147                    ).into());
148                }
149            }
150        }
151
152        // 1. Detect when an Iceberg table is part of the dependencies.
153        // 2. Drop database with iceberg tables in it is not supported.
154        if object_type != ObjectType::Table || drop_database {
155            for obj in &removed_objects {
156                // if the obj is iceberg engine table, bail out
157                if obj.obj_type == ObjectType::Table {
158                    let table = Table::find_by_id(obj.oid)
159                        .one(&txn)
160                        .await?
161                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
162                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
163                        return Err(MetaError::permission_denied(format!(
164                            "Found iceberg table in dependency: {}, please drop it manually",
165                            table.name,
166                        )));
167                    }
168                }
169            }
170        }
171
172        let removed_table_ids = removed_objects
173            .iter()
174            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
175            .map(|obj| obj.oid);
176
177        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
178            .select_only()
179            .column(streaming_job::Column::JobId)
180            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
181            .into_tuple()
182            .all(&txn)
183            .await?;
184
185        // Check if there are any streaming jobs that are creating.
186        if !removed_streaming_job_ids.is_empty() {
187            let creating = StreamingJob::find()
188                .filter(
189                    streaming_job::Column::JobStatus
190                        .ne(JobStatus::Created)
191                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
192                )
193                .count(&txn)
194                .await?;
195            if creating != 0 {
196                if creating == 1 && object_type == ObjectType::Sink {
197                    info!("dropping creating sink job, it will be cancelled");
198                } else {
199                    return Err(MetaError::permission_denied(format!(
200                        "can not drop {creating} creating streaming job, please cancel them firstly"
201                    )));
202                }
203            }
204        }
205
206        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
207
208        // Add associated sources.
209        let mut removed_source_ids: Vec<SourceId> = Table::find()
210            .select_only()
211            .column(table::Column::OptionalAssociatedSourceId)
212            .filter(
213                table::Column::TableId
214                    .is_in(removed_table_ids)
215                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
216            )
217            .into_tuple()
218            .all(&txn)
219            .await?;
220        let removed_source_objs: Vec<PartialObject> = Object::find()
221            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
222            .into_partial_model()
223            .all(&txn)
224            .await?;
225        removed_objects.extend(removed_source_objs);
226        if object_type == ObjectType::Source {
227            removed_source_ids.push(object_id);
228        }
229
230        let removed_secret_ids = removed_objects
231            .iter()
232            .filter(|obj| obj.obj_type == ObjectType::Secret)
233            .map(|obj| obj.oid)
234            .collect_vec();
235
236        if !removed_streaming_job_ids.is_empty() {
237            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
238                .select_only()
239                .columns([
240                    object::Column::Oid,
241                    object::Column::ObjType,
242                    object::Column::SchemaId,
243                    object::Column::DatabaseId,
244                ])
245                .join(JoinType::InnerJoin, object::Relation::Table.def())
246                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
247                .into_partial_model()
248                .all(&txn)
249                .await?;
250
251            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
252            removed_objects.extend(removed_internal_table_objs);
253        }
254
255        let removed_objects: HashMap<_, _> = removed_objects
256            .into_iter()
257            .map(|obj| (obj.oid, obj))
258            .collect();
259
260        // TODO: Support drop cascade for cross-database query.
261        for obj in removed_objects.values() {
262            if let Some(obj_database_id) = obj.database_id
263                && obj_database_id != database_id
264            {
265                return Err(MetaError::permission_denied(format!(
266                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
267                )));
268            }
269        }
270
271        let (removed_source_fragments, removed_sink_fragments, removed_actors, removed_fragments) =
272            get_fragments_for_jobs(
273                &txn,
274                self.env.shared_actor_infos(),
275                removed_streaming_job_ids.clone(),
276            )
277            .await?;
278
279        let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
280        let mut removed_sink_fragment_by_targets = HashMap::new();
281        for (sink_fragment, target_fragments) in sink_target_fragments {
282            assert!(
283                target_fragments.len() <= 1,
284                "sink should have at most one downstream fragment"
285            );
286            if let Some(target_fragment) = target_fragments.first()
287                && !removed_fragments.contains(target_fragment)
288            {
289                removed_sink_fragment_by_targets
290                    .entry(*target_fragment)
291                    .or_insert_with(Vec::new)
292                    .push(sink_fragment);
293            }
294        }
295
296        // Find affect users with privileges on all this objects.
297        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
298            .select_only()
299            .distinct()
300            .column(user_privilege::Column::UserId)
301            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
302            .into_tuple()
303            .all(&txn)
304            .await?;
305        let dropped_tables = Table::find()
306            .find_also_related(Object)
307            .filter(
308                table::Column::TableId.is_in(
309                    removed_state_table_ids
310                        .iter()
311                        .copied()
312                        .collect::<HashSet<ObjectId>>(),
313                ),
314            )
315            .all(&txn)
316            .await?
317            .into_iter()
318            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
319        // delete all in to_drop_objects.
320        let res = Object::delete_many()
321            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
322            .exec(&txn)
323            .await?;
324        if res.rows_affected == 0 {
325            return Err(MetaError::catalog_id_not_found(
326                object_type.as_str(),
327                object_id,
328            ));
329        }
330        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
331
332        txn.commit().await?;
333
334        // notify about them.
335        self.notify_users_update(user_infos).await;
336        inner
337            .dropped_tables
338            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
339
340        let version = match object_type {
341            ObjectType::Database => {
342                // TODO: Notify objects in other databases when the cross-database query is supported.
343                self.notify_frontend(
344                    NotificationOperation::Delete,
345                    NotificationInfo::Database(PbDatabase {
346                        id: database_id as _,
347                        ..Default::default()
348                    }),
349                )
350                .await
351            }
352            ObjectType::Schema => {
353                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
354                    .into_values()
355                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
356                let schema_obj = schema_obj
357                    .into_iter()
358                    .exactly_one()
359                    .expect("schema object not found");
360                to_notify_objs.push(schema_obj);
361
362                let relation_group = build_object_group_for_delete(to_notify_objs);
363                self.notify_frontend(NotificationOperation::Delete, relation_group)
364                    .await
365            }
366            _ => {
367                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
368                // They only need RelationInfo::Table.
369                let relation_group =
370                    build_object_group_for_delete(removed_objects.into_values().collect());
371                self.notify_frontend(NotificationOperation::Delete, relation_group)
372                    .await
373            }
374        };
375
376        Ok((
377            ReleaseContext {
378                database_id,
379                removed_streaming_job_ids,
380                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
381                removed_source_ids,
382                removed_secret_ids,
383                removed_source_fragments,
384                removed_actors,
385                removed_fragments,
386                removed_sink_fragment_by_targets,
387            },
388            version,
389        ))
390    }
391
392    pub async fn try_abort_creating_subscription(
393        &self,
394        subscription_id: SubscriptionId,
395    ) -> MetaResult<()> {
396        let inner = self.inner.write().await;
397        let txn = inner.db.begin().await?;
398
399        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
400        let Some(subscription) = subscription else {
401            tracing::warn!(
402                subscription_id,
403                "subscription not found when aborting creation, might be cleaned by recovery"
404            );
405            return Ok(());
406        };
407
408        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
409            tracing::warn!(
410                subscription_id,
411                "subscription is already created when aborting creation"
412            );
413            return Ok(());
414        }
415
416        subscription.delete(&txn).await?;
417        Ok(())
418    }
419}
420
421async fn report_drop_object(
422    object_type: ObjectType,
423    object_id: ObjectId,
424    txn: &DatabaseTransaction,
425) {
426    let connector_name = {
427        match object_type {
428            ObjectType::Sink => Sink::find_by_id(object_id)
429                .select_only()
430                .column(sink::Column::Properties)
431                .into_tuple::<Property>()
432                .one(txn)
433                .await
434                .ok()
435                .flatten()
436                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
437            ObjectType::Source => Source::find_by_id(object_id)
438                .select_only()
439                .column(source::Column::WithProperties)
440                .into_tuple::<Property>()
441                .one(txn)
442                .await
443                .ok()
444                .flatten()
445                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
446            _ => unreachable!(),
447        }
448    };
449    if let Some(connector_name) = connector_name {
450        report_event(
451            PbTelemetryEventStage::DropStreamJob,
452            "source",
453            object_id.into(),
454            Some(connector_name),
455            Some(match object_type {
456                ObjectType::Source => PbTelemetryDatabaseObject::Source,
457                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
458                _ => unreachable!(),
459            }),
460            None,
461        );
462    }
463}