risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_pb::catalog::PbTable;
16use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
17use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, QueryFilter};
18
19use super::*;
20impl CatalogController {
21    // Drop all kinds of objects including databases,
22    // schemas, relations, connections, functions, etc.
23    pub async fn drop_object(
24        &self,
25        object_type: ObjectType,
26        object_id: ObjectId,
27        drop_mode: DropMode,
28    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
29        let mut inner = self.inner.write().await;
30        let txn = inner.db.begin().await?;
31
32        let obj: PartialObject = Object::find_by_id(object_id)
33            .into_partial_model()
34            .one(&txn)
35            .await?
36            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
37        assert_eq!(obj.obj_type, object_type);
38        let database_id = if object_type == ObjectType::Database {
39            object_id
40        } else {
41            obj.database_id
42                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
43        };
44
45        // Check the cross-db dependency info to see if the subscription can be dropped.
46        if obj.obj_type == ObjectType::Subscription {
47            validate_subscription_deletion(&txn, object_id).await?;
48        }
49
50        let mut removed_objects = match drop_mode {
51            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
52            DropMode::Restrict => match object_type {
53                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
54                ObjectType::Schema => {
55                    ensure_schema_empty(object_id, &txn).await?;
56                    Default::default()
57                }
58                ObjectType::Table => {
59                    ensure_object_not_refer(object_type, object_id, &txn).await?;
60                    let indexes = get_referring_objects(object_id, &txn).await?;
61                    for obj in indexes.iter().filter(|object| {
62                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
63                    }) {
64                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
65                    }
66                    assert!(
67                        indexes.iter().all(|obj| obj.obj_type == ObjectType::Index),
68                        "only index could be dropped in restrict mode"
69                    );
70                    indexes
71                }
72                object_type @ (ObjectType::Source | ObjectType::Sink) => {
73                    ensure_object_not_refer(object_type, object_id, &txn).await?;
74                    report_drop_object(object_type, object_id, &txn).await;
75                    vec![]
76                }
77
78                ObjectType::View
79                | ObjectType::Index
80                | ObjectType::Function
81                | ObjectType::Connection
82                | ObjectType::Subscription
83                | ObjectType::Secret => {
84                    ensure_object_not_refer(object_type, object_id, &txn).await?;
85                    vec![]
86                }
87            },
88        };
89        removed_objects.push(obj);
90        let mut removed_object_ids: HashSet<_> =
91            removed_objects.iter().map(|obj| obj.oid).collect();
92
93        // TODO: record dependency info in object_dependency table for sink into table.
94        // Special handling for 'sink into table'.
95        let removed_incoming_sinks: Vec<I32Array> = Table::find()
96            .select_only()
97            .column(table::Column::IncomingSinks)
98            .filter(table::Column::TableId.is_in(removed_object_ids.clone()))
99            .into_tuple()
100            .all(&txn)
101            .await?;
102        if !removed_incoming_sinks.is_empty() {
103            let removed_sink_objs: Vec<PartialObject> = Object::find()
104                .filter(
105                    object::Column::Oid.is_in(
106                        removed_incoming_sinks
107                            .into_iter()
108                            .flat_map(|arr| arr.into_inner().into_iter()),
109                    ),
110                )
111                .into_partial_model()
112                .all(&txn)
113                .await?;
114
115            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
116            removed_objects.extend(removed_sink_objs);
117        }
118
119        // When there is a table sink in the dependency chain of drop cascade, an error message needs to be returned currently to manually drop the sink.
120        if object_type != ObjectType::Sink {
121            for obj in &removed_objects {
122                if obj.obj_type == ObjectType::Sink {
123                    let sink = Sink::find_by_id(obj.oid)
124                        .one(&txn)
125                        .await?
126                        .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
127
128                    // Since dropping the sink into the table requires the frontend to handle some of the logic (regenerating the plan), it’s not compatible with the current cascade dropping.
129                    if let Some(target_table) = sink.target_table
130                        && !removed_object_ids.contains(&target_table)
131                    {
132                        return Err(MetaError::permission_denied(format!(
133                            "Found sink into table in dependency: {}, please drop it manually",
134                            sink.name,
135                        )));
136                    }
137                }
138            }
139        }
140
141        let removed_table_ids = removed_objects
142            .iter()
143            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
144            .map(|obj| obj.oid);
145
146        let removed_streaming_job_ids: Vec<ObjectId> = StreamingJob::find()
147            .select_only()
148            .column(streaming_job::Column::JobId)
149            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
150            .into_tuple()
151            .all(&txn)
152            .await?;
153
154        // Check if there are any streaming jobs that are creating.
155        if !removed_streaming_job_ids.is_empty() {
156            let creating = StreamingJob::find()
157                .filter(
158                    streaming_job::Column::JobStatus
159                        .ne(JobStatus::Created)
160                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
161                )
162                .count(&txn)
163                .await?;
164            if creating != 0 {
165                return Err(MetaError::permission_denied(format!(
166                    "can not drop {creating} creating streaming job, please cancel them firstly"
167                )));
168            }
169        }
170
171        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
172
173        // Add associated sources.
174        let mut removed_source_ids: Vec<SourceId> = Table::find()
175            .select_only()
176            .column(table::Column::OptionalAssociatedSourceId)
177            .filter(
178                table::Column::TableId
179                    .is_in(removed_table_ids)
180                    .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
181            )
182            .into_tuple()
183            .all(&txn)
184            .await?;
185        let removed_source_objs: Vec<PartialObject> = Object::find()
186            .filter(object::Column::Oid.is_in(removed_source_ids.clone()))
187            .into_partial_model()
188            .all(&txn)
189            .await?;
190        removed_objects.extend(removed_source_objs);
191        if object_type == ObjectType::Source {
192            removed_source_ids.push(object_id);
193        }
194
195        let removed_secret_ids = removed_objects
196            .iter()
197            .filter(|obj| obj.obj_type == ObjectType::Secret)
198            .map(|obj| obj.oid)
199            .collect_vec();
200
201        if !removed_streaming_job_ids.is_empty() {
202            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
203                .select_only()
204                .columns([
205                    object::Column::Oid,
206                    object::Column::ObjType,
207                    object::Column::SchemaId,
208                    object::Column::DatabaseId,
209                ])
210                .join(JoinType::InnerJoin, object::Relation::Table.def())
211                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
212                .into_partial_model()
213                .all(&txn)
214                .await?;
215
216            removed_state_table_ids.extend(removed_internal_table_objs.iter().map(|obj| obj.oid));
217            removed_objects.extend(removed_internal_table_objs);
218        }
219
220        let removed_objects: HashMap<_, _> = removed_objects
221            .into_iter()
222            .map(|obj| (obj.oid, obj))
223            .collect();
224
225        // TODO: Support drop cascade for cross-database query.
226        for obj in removed_objects.values() {
227            if let Some(obj_database_id) = obj.database_id {
228                if obj_database_id != database_id {
229                    return Err(MetaError::permission_denied(format!(
230                        "Referenced by other objects in database {obj_database_id}, please drop them manually"
231                    )));
232                }
233            }
234        }
235
236        let (removed_source_fragments, removed_actors, removed_fragments) =
237            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
238
239        // Find affect users with privileges on all this objects.
240        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
241            .select_only()
242            .distinct()
243            .column(user_privilege::Column::UserId)
244            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
245            .into_tuple()
246            .all(&txn)
247            .await?;
248        let dropped_tables = Table::find()
249            .find_also_related(Object)
250            .filter(
251                table::Column::TableId.is_in(
252                    removed_state_table_ids
253                        .iter()
254                        .copied()
255                        .collect::<HashSet<ObjectId>>(),
256                ),
257            )
258            .all(&txn)
259            .await?
260            .into_iter()
261            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
262        // delete all in to_drop_objects.
263        let res = Object::delete_many()
264            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
265            .exec(&txn)
266            .await?;
267        if res.rows_affected == 0 {
268            return Err(MetaError::catalog_id_not_found(
269                object_type.as_str(),
270                object_id,
271            ));
272        }
273        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
274
275        txn.commit().await?;
276
277        // notify about them.
278        self.notify_users_update(user_infos).await;
279        inner
280            .dropped_tables
281            .extend(dropped_tables.map(|t| (TableId::try_from(t.id).unwrap(), t)));
282        let version = match object_type {
283            ObjectType::Database => {
284                // TODO: Notify objects in other databases when the cross-database query is supported.
285                self.notify_frontend(
286                    NotificationOperation::Delete,
287                    NotificationInfo::Database(PbDatabase {
288                        id: database_id as _,
289                        ..Default::default()
290                    }),
291                )
292                .await
293            }
294            ObjectType::Schema => {
295                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
296                    .into_values()
297                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
298                let schema_obj = schema_obj
299                    .into_iter()
300                    .exactly_one()
301                    .expect("schema object not found");
302                to_notify_objs.push(schema_obj);
303
304                let relation_group = build_object_group_for_delete(to_notify_objs);
305                self.notify_frontend(NotificationOperation::Delete, relation_group)
306                    .await
307            }
308            _ => {
309                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
310                // They only need RelationInfo::Table.
311                let relation_group =
312                    build_object_group_for_delete(removed_objects.into_values().collect());
313                self.notify_frontend(NotificationOperation::Delete, relation_group)
314                    .await
315            }
316        };
317
318        let fragment_mappings = removed_fragments
319            .iter()
320            .map(|fragment_id| PbFragmentWorkerSlotMapping {
321                fragment_id: *fragment_id as _,
322                mapping: None,
323            })
324            .collect();
325
326        self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings)
327            .await;
328
329        Ok((
330            ReleaseContext {
331                database_id,
332                removed_streaming_job_ids,
333                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
334                removed_source_ids,
335                removed_secret_ids,
336                removed_source_fragments,
337                removed_actors,
338                removed_fragments,
339            },
340            version,
341        ))
342    }
343}
344
345async fn report_drop_object(
346    object_type: ObjectType,
347    object_id: ObjectId,
348    txn: &DatabaseTransaction,
349) {
350    let connector_name = {
351        match object_type {
352            ObjectType::Sink => Sink::find_by_id(object_id)
353                .one(txn)
354                .await
355                .ok()
356                .flatten()
357                .and_then(|sink| sink.properties.inner_ref().get("connector").cloned()),
358            ObjectType::Source => Source::find_by_id(object_id)
359                .one(txn)
360                .await
361                .ok()
362                .flatten()
363                .and_then(|source| source.with_properties.inner_ref().get("connector").cloned()),
364            _ => unreachable!(),
365        }
366    };
367    if let Some(connector_name) = connector_name {
368        report_event(
369            PbTelemetryEventStage::DropStreamJob,
370            "source",
371            object_id.into(),
372            Some(connector_name),
373            Some(match object_type {
374                ObjectType::Source => PbTelemetryDatabaseObject::Source,
375                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
376                _ => unreachable!(),
377            }),
378            None,
379        );
380    }
381}