risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_pb::catalog::PbTable;
17use risingwave_pb::catalog::subscription::PbSubscriptionState;
18use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
19use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
20
21use super::*;
22impl CatalogController {
23    // Drop all kinds of objects including databases,
24    // schemas, relations, connections, functions, etc.
25    pub async fn drop_object(
26        &self,
27        object_type: ObjectType,
28        object_id: impl Into<ObjectId>,
29        drop_mode: DropMode,
30    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
31        let object_id = object_id.into();
32        let mut inner = self.inner.write().await;
33        let txn = inner.db.begin().await?;
34
35        let obj: PartialObject = Object::find_by_id(object_id)
36            .into_partial_model()
37            .one(&txn)
38            .await?
39            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
40        assert_eq!(obj.obj_type, object_type);
41        let drop_database = object_type == ObjectType::Database;
42        let database_id = if object_type == ObjectType::Database {
43            object_id.as_database_id()
44        } else {
45            obj.database_id
46                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
47        };
48
49        // Check the cross-db dependency info to see if the subscription can be dropped.
50        if obj.obj_type == ObjectType::Subscription {
51            validate_subscription_deletion(&txn, object_id.as_subscription_id()).await?;
52        }
53
54        let mut removed_objects = match drop_mode {
55            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
56            DropMode::Restrict => match object_type {
57                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
58                ObjectType::Schema => {
59                    ensure_schema_empty(object_id.as_schema_id(), &txn).await?;
60                    Default::default()
61                }
62                ObjectType::Table => {
63                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
64                    let objects = get_referring_objects(object_id, &txn).await?;
65                    for obj in objects.iter().filter(|object| {
66                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
67                    }) {
68                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
69                    }
70                    assert!(
71                        objects.iter().all(|obj| obj.obj_type == ObjectType::Index
72                            || obj.obj_type == ObjectType::Sink),
73                        "only index and iceberg sink could be dropped in restrict mode"
74                    );
75                    for obj in &objects {
76                        check_object_refer_for_drop(obj.obj_type, obj.oid, &txn).await?;
77                    }
78                    objects
79                }
80                object_type @ (ObjectType::Source | ObjectType::Sink) => {
81                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
82                    report_drop_object(object_type, object_id, &txn).await;
83                    vec![]
84                }
85
86                ObjectType::View
87                | ObjectType::Index
88                | ObjectType::Function
89                | ObjectType::Connection
90                | ObjectType::Subscription
91                | ObjectType::Secret => {
92                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
93                    vec![]
94                }
95            },
96        };
97
98        // check iceberg source.
99        if obj.obj_type == ObjectType::Table {
100            let table_name = Table::find_by_id(object_id.as_table_id())
101                .select_only()
102                .column(table::Column::Name)
103                .into_tuple::<String>()
104                .one(&txn)
105                .await?
106                .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
107            let iceberg_source = Source::find()
108                .inner_join(Object)
109                .filter(
110                    object::Column::DatabaseId
111                        .eq(database_id)
112                        .and(object::Column::SchemaId.eq(obj.schema_id.unwrap()))
113                        .and(
114                            source::Column::Name
115                                .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table_name)),
116                        ),
117                )
118                .into_partial_model()
119                .one(&txn)
120                .await?;
121            if let Some(iceberg_source) = iceberg_source {
122                removed_objects.push(iceberg_source);
123            }
124        }
125
126        removed_objects.push(obj);
127        let mut removed_object_ids: HashSet<_> =
128            removed_objects.iter().map(|obj| obj.oid).collect();
129
130        // TODO: record dependency info in object_dependency table for sink into table.
131        // Special handling for 'sink into table'.
132        let incoming_sink_ids: Vec<SinkId> = Sink::find()
133            .select_only()
134            .column(sink::Column::SinkId)
135            .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
136            .into_tuple()
137            .all(&txn)
138            .await?;
139        if !incoming_sink_ids.is_empty() {
140            if self.env.opts.protect_drop_table_with_incoming_sink {
141                let sink_names: Vec<String> = Sink::find()
142                    .select_only()
143                    .column(sink::Column::Name)
144                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
145                    .into_tuple()
146                    .all(&txn)
147                    .await?;
148
149                return Err(MetaError::permission_denied(format!(
150                    "Table used by incoming sinks: {:?}, please drop them manually",
151                    sink_names
152                )));
153            }
154
155            let removed_sink_objs: Vec<PartialObject> = Object::find()
156                .filter(object::Column::Oid.is_in(incoming_sink_ids))
157                .into_partial_model()
158                .all(&txn)
159                .await?;
160
161            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
162            removed_objects.extend(removed_sink_objs);
163        }
164
165        for obj in &removed_objects {
166            if obj.obj_type == ObjectType::Sink {
167                let sink = Sink::find_by_id(obj.oid.as_sink_id())
168                    .one(&txn)
169                    .await?
170                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
171
172                if let Some(target_table) = sink.target_table
173                    && !removed_object_ids.contains(&target_table.as_object_id())
174                    && !has_table_been_migrated(&txn, target_table).await?
175                {
176                    return Err(anyhow::anyhow!(
177                        "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
178                        target_table
179                    ).into());
180                }
181            }
182        }
183
184        // 1. Detect when an Iceberg table is part of the dependencies.
185        // 2. Drop database with iceberg tables in it is not supported.
186        if object_type != ObjectType::Table || drop_database {
187            for obj in &removed_objects {
188                // if the obj is iceberg engine table, bail out
189                if obj.obj_type == ObjectType::Table {
190                    let table = Table::find_by_id(obj.oid.as_table_id())
191                        .one(&txn)
192                        .await?
193                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
194                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
195                        return Err(MetaError::permission_denied(format!(
196                            "Found iceberg table in dependency: {}, please drop it manually",
197                            table.name,
198                        )));
199                    }
200                }
201            }
202        }
203
204        let removed_table_ids = removed_objects
205            .iter()
206            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
207            .map(|obj| obj.oid.as_table_id());
208
209        let removed_iceberg_table_sinks: Vec<PbSink> = Sink::find()
210            .find_also_related(Object)
211            .filter(
212                sink::Column::SinkId
213                    .is_in(removed_object_ids.clone())
214                    .and(sink::Column::Name.like(format!("{}%", ICEBERG_SINK_PREFIX))),
215            )
216            .all(&txn)
217            .await?
218            .into_iter()
219            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap()).into())
220            .collect();
221
222        let removed_streaming_job_ids: Vec<JobId> = StreamingJob::find()
223            .select_only()
224            .column(streaming_job::Column::JobId)
225            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
226            .into_tuple()
227            .all(&txn)
228            .await?;
229
230        // Check if there are any streaming jobs that are creating.
231        if !removed_streaming_job_ids.is_empty() {
232            let creating = StreamingJob::find()
233                .filter(
234                    streaming_job::Column::JobStatus
235                        .ne(JobStatus::Created)
236                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
237                )
238                .count(&txn)
239                .await?;
240            if creating != 0 {
241                if creating == 1 && object_type == ObjectType::Sink {
242                    info!("dropping creating sink job, it will be cancelled");
243                } else {
244                    return Err(MetaError::permission_denied(format!(
245                        "can not drop {creating} creating streaming job, please cancel them firstly"
246                    )));
247                }
248            }
249        }
250
251        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
252
253        if !drop_database {
254            // Add associated sources.
255            let removed_source_ids: Vec<SourceId> = Table::find()
256                .select_only()
257                .column(table::Column::OptionalAssociatedSourceId)
258                .filter(
259                    table::Column::TableId
260                        .is_in(removed_table_ids)
261                        .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
262                )
263                .into_tuple()
264                .all(&txn)
265                .await?;
266            let removed_source_objs: Vec<PartialObject> = Object::find()
267                .filter(object::Column::Oid.is_in(removed_source_ids))
268                .into_partial_model()
269                .all(&txn)
270                .await?;
271            removed_objects.extend(removed_source_objs);
272        }
273
274        let removed_source_ids: HashSet<_> = removed_objects
275            .iter()
276            .filter(|obj| obj.obj_type == ObjectType::Source)
277            .map(|obj| obj.oid.as_source_id())
278            .collect();
279
280        let removed_secret_ids = removed_objects
281            .iter()
282            .filter(|obj| obj.obj_type == ObjectType::Secret)
283            .map(|obj| obj.oid.as_secret_id())
284            .collect_vec();
285
286        if !removed_streaming_job_ids.is_empty() {
287            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
288                .select_only()
289                .columns([
290                    object::Column::Oid,
291                    object::Column::ObjType,
292                    object::Column::SchemaId,
293                    object::Column::DatabaseId,
294                ])
295                .join(JoinType::InnerJoin, object::Relation::Table.def())
296                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
297                .into_partial_model()
298                .all(&txn)
299                .await?;
300
301            removed_state_table_ids.extend(
302                removed_internal_table_objs
303                    .iter()
304                    .map(|obj| obj.oid.as_table_id()),
305            );
306            removed_objects.extend(removed_internal_table_objs);
307        }
308
309        let removed_objects: HashMap<_, _> = removed_objects
310            .into_iter()
311            .map(|obj| (obj.oid, obj))
312            .collect();
313
314        // TODO: Support drop cascade for cross-database query.
315        for obj in removed_objects.values() {
316            if let Some(obj_database_id) = obj.database_id
317                && obj_database_id != database_id
318            {
319                return Err(MetaError::permission_denied(format!(
320                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
321                )));
322            }
323        }
324
325        let (removed_source_fragments, removed_sink_fragments, removed_actors, removed_fragments) =
326            get_fragments_for_jobs(
327                &txn,
328                self.env.shared_actor_infos(),
329                removed_streaming_job_ids.clone(),
330            )
331            .await?;
332
333        let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
334        let mut removed_sink_fragment_by_targets = HashMap::new();
335        for (sink_fragment, target_fragments) in sink_target_fragments {
336            assert!(
337                target_fragments.len() <= 1,
338                "sink should have at most one downstream fragment"
339            );
340            if let Some(target_fragment) = target_fragments.first()
341                && !removed_fragments.contains(target_fragment)
342            {
343                removed_sink_fragment_by_targets
344                    .entry(*target_fragment)
345                    .or_insert_with(Vec::new)
346                    .push(sink_fragment);
347            }
348        }
349
350        // Find affect users with privileges on all this objects.
351        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
352            .select_only()
353            .distinct()
354            .column(user_privilege::Column::UserId)
355            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
356            .into_tuple()
357            .all(&txn)
358            .await?;
359        let dropped_tables = Table::find()
360            .find_also_related(Object)
361            .filter(
362                table::Column::TableId.is_in(
363                    removed_state_table_ids
364                        .iter()
365                        .copied()
366                        .collect::<HashSet<TableId>>(),
367                ),
368            )
369            .all(&txn)
370            .await?
371            .into_iter()
372            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap())));
373        // delete all in to_drop_objects.
374        let res = Object::delete_many()
375            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
376            .exec(&txn)
377            .await?;
378        if res.rows_affected == 0 {
379            return Err(MetaError::catalog_id_not_found(
380                object_type.as_str(),
381                object_id,
382            ));
383        }
384        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
385
386        txn.commit().await?;
387
388        // notify about them.
389        self.notify_users_update(user_infos).await;
390        inner
391            .dropped_tables
392            .extend(dropped_tables.map(|t| (t.id, t)));
393
394        let version = match object_type {
395            ObjectType::Database => {
396                // TODO: Notify objects in other databases when the cross-database query is supported.
397                self.notify_frontend(
398                    NotificationOperation::Delete,
399                    NotificationInfo::Database(PbDatabase {
400                        id: database_id,
401                        ..Default::default()
402                    }),
403                )
404                .await
405            }
406            ObjectType::Schema => {
407                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
408                    .into_values()
409                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
410                let schema_obj = schema_obj
411                    .into_iter()
412                    .exactly_one()
413                    .expect("schema object not found");
414                to_notify_objs.push(schema_obj);
415
416                let relation_group = build_object_group_for_delete(to_notify_objs);
417                self.notify_frontend(NotificationOperation::Delete, relation_group)
418                    .await
419            }
420            _ => {
421                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
422                // They only need RelationInfo::Table.
423                let relation_group =
424                    build_object_group_for_delete(removed_objects.into_values().collect());
425                self.notify_frontend(NotificationOperation::Delete, relation_group)
426                    .await
427            }
428        };
429
430        Ok((
431            ReleaseContext {
432                database_id,
433                removed_streaming_job_ids,
434                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
435                removed_source_ids: removed_source_ids.into_iter().collect(),
436                removed_secret_ids,
437                removed_source_fragments,
438                removed_actors,
439                removed_fragments,
440                removed_sink_fragment_by_targets,
441                removed_iceberg_table_sinks,
442            },
443            version,
444        ))
445    }
446
447    pub async fn try_abort_creating_subscription(
448        &self,
449        subscription_id: SubscriptionId,
450    ) -> MetaResult<()> {
451        let inner = self.inner.write().await;
452        let txn = inner.db.begin().await?;
453
454        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
455        let Some(subscription) = subscription else {
456            tracing::warn!(
457                %subscription_id,
458                "subscription not found when aborting creation, might be cleaned by recovery"
459            );
460            return Ok(());
461        };
462
463        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
464            tracing::warn!(
465                %subscription_id,
466                "subscription is already created when aborting creation"
467            );
468            return Ok(());
469        }
470
471        subscription.delete(&txn).await?;
472        Ok(())
473    }
474}
475
476async fn report_drop_object(
477    object_type: ObjectType,
478    object_id: ObjectId,
479    txn: &DatabaseTransaction,
480) {
481    let connector_name = {
482        match object_type {
483            ObjectType::Sink => Sink::find_by_id(object_id.as_sink_id())
484                .select_only()
485                .column(sink::Column::Properties)
486                .into_tuple::<Property>()
487                .one(txn)
488                .await
489                .ok()
490                .flatten()
491                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
492            ObjectType::Source => Source::find_by_id(object_id.as_source_id())
493                .select_only()
494                .column(source::Column::WithProperties)
495                .into_tuple::<Property>()
496                .one(txn)
497                .await
498                .ok()
499                .flatten()
500                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
501            _ => unreachable!(),
502        }
503    };
504    if let Some(connector_name) = connector_name {
505        report_event(
506            PbTelemetryEventStage::DropStreamJob,
507            "source",
508            object_id.as_raw_id() as _,
509            Some(connector_name),
510            Some(match object_type {
511                ObjectType::Source => PbTelemetryDatabaseObject::Source,
512                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
513                _ => unreachable!(),
514            }),
515            None,
516        );
517    }
518}