Skip to main content

risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_pb::catalog::PbTable;
17use risingwave_pb::catalog::subscription::PbSubscriptionState;
18use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
19use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
20
21use super::*;
22impl CatalogController {
23    // Drop all kinds of objects including databases,
24    // schemas, relations, connections, functions, etc.
25    pub async fn drop_object(
26        &self,
27        object_type: ObjectType,
28        object_id: impl Into<ObjectId>,
29        drop_mode: DropMode,
30    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
31        let object_id = object_id.into();
32        let mut inner = self.inner.write().await;
33        let txn = inner.db.begin().await?;
34
35        let obj: PartialObject = Object::find_by_id(object_id)
36            .into_partial_model()
37            .one(&txn)
38            .await?
39            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
40        assert_eq!(obj.obj_type, object_type);
41        let drop_database = object_type == ObjectType::Database;
42        let database_id = if object_type == ObjectType::Database {
43            object_id.as_database_id()
44        } else {
45            obj.database_id
46                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
47        };
48
49        // Check the cross-db dependency info to see if the subscription can be dropped.
50        if obj.obj_type == ObjectType::Subscription {
51            validate_subscription_deletion(&txn, object_id.as_subscription_id()).await?;
52        }
53
54        let mut removed_objects = match drop_mode {
55            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
56            DropMode::Restrict => match object_type {
57                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
58                ObjectType::Schema => {
59                    ensure_schema_empty(object_id.as_schema_id(), &txn).await?;
60                    Default::default()
61                }
62                ObjectType::Table => {
63                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
64                    let objects = get_referring_objects(object_id, &txn).await?;
65                    for obj in objects.iter().filter(|object| {
66                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
67                    }) {
68                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
69                    }
70                    assert!(
71                        objects.iter().all(|obj| obj.obj_type == ObjectType::Index
72                            || obj.obj_type == ObjectType::Sink),
73                        "only index and iceberg sink could be dropped in restrict mode"
74                    );
75                    for obj in &objects {
76                        check_object_refer_for_drop(obj.obj_type, obj.oid, &txn).await?;
77                    }
78                    objects
79                }
80                object_type @ (ObjectType::Source | ObjectType::Sink) => {
81                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
82                    report_drop_object(object_type, object_id, &txn).await;
83                    vec![]
84                }
85
86                ObjectType::View
87                | ObjectType::Index
88                | ObjectType::Function
89                | ObjectType::Connection
90                | ObjectType::Subscription
91                | ObjectType::Secret => {
92                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
93                    vec![]
94                }
95            },
96        };
97
98        // check iceberg source.
99        if obj.obj_type == ObjectType::Table {
100            let table_name = Table::find_by_id(object_id.as_table_id())
101                .select_only()
102                .column(table::Column::Name)
103                .into_tuple::<String>()
104                .one(&txn)
105                .await?
106                .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
107            let iceberg_source = Source::find()
108                .inner_join(Object)
109                .filter(
110                    object::Column::DatabaseId
111                        .eq(database_id)
112                        .and(object::Column::SchemaId.eq(obj.schema_id.unwrap()))
113                        .and(
114                            source::Column::Name
115                                .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table_name)),
116                        ),
117                )
118                .into_partial_model()
119                .one(&txn)
120                .await?;
121            if let Some(iceberg_source) = iceberg_source {
122                removed_objects.push(iceberg_source);
123            }
124        }
125
126        removed_objects.push(obj);
127        let mut removed_object_ids: HashSet<_> =
128            removed_objects.iter().map(|obj| obj.oid).collect();
129
130        // TODO: record dependency info in object_dependency table for sink into table.
131        // Special handling for 'sink into table'.
132        let incoming_sink_ids: Vec<SinkId> = Sink::find()
133            .select_only()
134            .column(sink::Column::SinkId)
135            .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
136            .into_tuple()
137            .all(&txn)
138            .await?;
139        if !incoming_sink_ids.is_empty() {
140            if self.env.opts.protect_drop_table_with_incoming_sink {
141                let sink_names: Vec<String> = Sink::find()
142                    .select_only()
143                    .column(sink::Column::Name)
144                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
145                    .into_tuple()
146                    .all(&txn)
147                    .await?;
148
149                return Err(MetaError::permission_denied(format!(
150                    "Table used by incoming sinks: {:?}, please drop them manually",
151                    sink_names
152                )));
153            }
154
155            let removed_sink_objs: Vec<PartialObject> = Object::find()
156                .filter(object::Column::Oid.is_in(incoming_sink_ids))
157                .into_partial_model()
158                .all(&txn)
159                .await?;
160
161            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
162            removed_objects.extend(removed_sink_objs);
163        }
164
165        for obj in &removed_objects {
166            if obj.obj_type == ObjectType::Sink {
167                let sink = Sink::find_by_id(obj.oid.as_sink_id())
168                    .one(&txn)
169                    .await?
170                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
171
172                if let Some(target_table) = sink.target_table
173                    && !removed_object_ids.contains(&target_table.as_object_id())
174                    && !has_table_been_migrated(&txn, target_table).await?
175                {
176                    return Err(anyhow::anyhow!(
177                        "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
178                        target_table
179                    ).into());
180                }
181            }
182        }
183
184        // 1. Detect when an Iceberg table is part of the dependencies.
185        // 2. Drop database with iceberg tables in it is not supported.
186        if object_type != ObjectType::Table || drop_database {
187            for obj in &removed_objects {
188                // if the obj is iceberg engine table, bail out
189                if obj.obj_type == ObjectType::Table {
190                    let table = Table::find_by_id(obj.oid.as_table_id())
191                        .one(&txn)
192                        .await?
193                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
194                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
195                        return Err(MetaError::permission_denied(format!(
196                            "Found iceberg table in dependency: {}, please drop it manually",
197                            table.name,
198                        )));
199                    }
200                }
201            }
202        }
203
204        let removed_table_ids = removed_objects
205            .iter()
206            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
207            .map(|obj| obj.oid.as_table_id());
208
209        let removed_iceberg_table_sinks: Vec<PbSink> = Sink::find()
210            .find_also_related(Object)
211            .filter(
212                sink::Column::SinkId
213                    .is_in(removed_object_ids.clone())
214                    .and(sink::Column::Name.like(format!("{}%", ICEBERG_SINK_PREFIX))),
215            )
216            .all(&txn)
217            .await?
218            .into_iter()
219            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap(), None).into())
220            .collect();
221
222        // Collect Iceberg V3 sink ids among dropped sinks so the V3 sink manager
223        // can tear down their per-sink commit workers. Unlike the iceberg-table
224        // cleanup above, V3 sinks are user-created with arbitrary names, so we
225        // identify them by inspecting properties rather than by name prefix.
226        let removed_iceberg_v3_sink_ids: Vec<SinkId> = Sink::find()
227            .filter(sink::Column::SinkId.is_in(removed_object_ids.clone()))
228            .all(&txn)
229            .await?
230            .into_iter()
231            .filter_map(|sink| {
232                crate::manager::iceberg_v3_sink::is_iceberg_v3_sink(sink.properties.inner_ref())
233                    .then_some(sink.sink_id)
234            })
235            .collect();
236
237        let removed_streaming_job_ids: Vec<JobId> = StreamingJob::find()
238            .select_only()
239            .column(streaming_job::Column::JobId)
240            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
241            .into_tuple()
242            .all(&txn)
243            .await?;
244
245        // Check if there are any streaming jobs that are creating.
246        if !removed_streaming_job_ids.is_empty() {
247            let creating = StreamingJob::find()
248                .filter(
249                    streaming_job::Column::JobStatus
250                        .ne(JobStatus::Created)
251                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
252                )
253                .count(&txn)
254                .await?;
255            if creating != 0 {
256                if creating == 1 && object_type == ObjectType::Sink {
257                    info!("dropping creating sink job, it will be cancelled");
258                } else {
259                    return Err(MetaError::permission_denied(format!(
260                        "can not drop {creating} creating streaming job, please cancel them firstly"
261                    )));
262                }
263            }
264        }
265
266        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
267
268        if !drop_database {
269            // Add associated sources.
270            let removed_source_ids: Vec<SourceId> = Table::find()
271                .select_only()
272                .column(table::Column::OptionalAssociatedSourceId)
273                .filter(
274                    table::Column::TableId
275                        .is_in(removed_table_ids)
276                        .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
277                )
278                .into_tuple()
279                .all(&txn)
280                .await?;
281            let removed_source_objs: Vec<PartialObject> = Object::find()
282                .filter(object::Column::Oid.is_in(removed_source_ids))
283                .into_partial_model()
284                .all(&txn)
285                .await?;
286            removed_objects.extend(removed_source_objs);
287        }
288
289        let removed_source_ids: HashSet<_> = removed_objects
290            .iter()
291            .filter(|obj| obj.obj_type == ObjectType::Source)
292            .map(|obj| obj.oid.as_source_id())
293            .collect();
294
295        let removed_secret_ids = removed_objects
296            .iter()
297            .filter(|obj| obj.obj_type == ObjectType::Secret)
298            .map(|obj| obj.oid.as_secret_id())
299            .collect();
300
301        if !removed_streaming_job_ids.is_empty() {
302            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
303                .select_only()
304                .columns([
305                    object::Column::Oid,
306                    object::Column::ObjType,
307                    object::Column::SchemaId,
308                    object::Column::DatabaseId,
309                ])
310                .join(JoinType::InnerJoin, object::Relation::Table.def())
311                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
312                .into_partial_model()
313                .all(&txn)
314                .await?;
315
316            removed_state_table_ids.extend(
317                removed_internal_table_objs
318                    .iter()
319                    .map(|obj| obj.oid.as_table_id()),
320            );
321            removed_objects.extend(removed_internal_table_objs);
322        }
323
324        let removed_objects: HashMap<_, _> = removed_objects
325            .into_iter()
326            .map(|obj| (obj.oid, obj))
327            .collect();
328
329        // TODO: Support drop cascade for cross-database query.
330        for obj in removed_objects.values() {
331            if let Some(obj_database_id) = obj.database_id
332                && obj_database_id != database_id
333            {
334                return Err(MetaError::permission_denied(format!(
335                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
336                )));
337            }
338        }
339
340        let (removed_source_fragments, removed_sink_fragments, removed_fragments) =
341            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
342
343        let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
344        let mut removed_sink_fragment_by_targets = HashMap::new();
345        for (sink_fragment, target_fragments) in sink_target_fragments {
346            assert!(
347                target_fragments.len() <= 1,
348                "sink should have at most one downstream fragment"
349            );
350            if let Some(target_fragment) = target_fragments.first()
351                && !removed_fragments.contains(target_fragment)
352            {
353                removed_sink_fragment_by_targets
354                    .entry(*target_fragment)
355                    .or_insert_with(Vec::new)
356                    .push(sink_fragment);
357            }
358        }
359
360        // Find affect users with privileges on all this objects.
361        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
362            .select_only()
363            .distinct()
364            .column(user_privilege::Column::UserId)
365            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
366            .into_tuple()
367            .all(&txn)
368            .await?;
369        let dropped_tables = Table::find()
370            .find_also_related(Object)
371            .filter(
372                table::Column::TableId.is_in(
373                    removed_state_table_ids
374                        .iter()
375                        .copied()
376                        .collect::<HashSet<TableId>>(),
377                ),
378            )
379            .all(&txn)
380            .await?
381            .into_iter()
382            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
383        // delete all in to_drop_objects.
384        let res = Object::delete_many()
385            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
386            .exec(&txn)
387            .await?;
388        if res.rows_affected == 0 {
389            return Err(MetaError::catalog_id_not_found(
390                object_type.as_str(),
391                object_id,
392            ));
393        }
394        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
395
396        txn.commit().await?;
397
398        // notify about them.
399        self.notify_users_update(user_infos).await;
400        inner
401            .dropped_tables
402            .extend(dropped_tables.map(|t| (t.id, t)));
403
404        let version = match object_type {
405            ObjectType::Database => {
406                // TODO: Notify objects in other databases when the cross-database query is supported.
407                self.notify_frontend(
408                    NotificationOperation::Delete,
409                    NotificationInfo::Database(PbDatabase {
410                        id: database_id,
411                        ..Default::default()
412                    }),
413                )
414                .await
415            }
416            ObjectType::Schema => {
417                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
418                    .into_values()
419                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
420                let schema_obj = Itertools::exactly_one(schema_obj.into_iter())
421                    .expect("schema object not found");
422                to_notify_objs.push(schema_obj);
423
424                let relation_group = build_object_group_for_delete(to_notify_objs);
425                self.notify_frontend(NotificationOperation::Delete, relation_group)
426                    .await
427            }
428            _ => {
429                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
430                // They only need RelationInfo::Table.
431                let relation_group =
432                    build_object_group_for_delete(removed_objects.into_values().collect());
433                self.notify_frontend(NotificationOperation::Delete, relation_group)
434                    .await
435            }
436        };
437
438        Ok((
439            ReleaseContext {
440                database_id,
441                removed_streaming_job_ids,
442                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
443                removed_source_ids: removed_source_ids.into_iter().collect(),
444                removed_secret_ids,
445                removed_source_fragments,
446                removed_fragments,
447                removed_sink_fragment_by_targets,
448                removed_iceberg_table_sinks,
449                removed_iceberg_v3_sink_ids,
450            },
451            version,
452        ))
453    }
454
455    pub async fn try_abort_creating_subscription(
456        &self,
457        subscription_id: SubscriptionId,
458    ) -> MetaResult<()> {
459        let inner = self.inner.write().await;
460        let txn = inner.db.begin().await?;
461
462        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
463        let Some(subscription) = subscription else {
464            tracing::warn!(
465                %subscription_id,
466                "subscription not found when aborting creation, might be cleaned by recovery"
467            );
468            return Ok(());
469        };
470
471        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
472            tracing::warn!(
473                %subscription_id,
474                "subscription is already created when aborting creation"
475            );
476            return Ok(());
477        }
478
479        subscription.delete(&txn).await?;
480        txn.commit().await?;
481        Ok(())
482    }
483}
484
485async fn report_drop_object(
486    object_type: ObjectType,
487    object_id: ObjectId,
488    txn: &DatabaseTransaction,
489) {
490    let connector_name = {
491        match object_type {
492            ObjectType::Sink => Sink::find_by_id(object_id.as_sink_id())
493                .select_only()
494                .column(sink::Column::Properties)
495                .into_tuple::<Property>()
496                .one(txn)
497                .await
498                .ok()
499                .flatten()
500                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
501            ObjectType::Source => Source::find_by_id(object_id.as_source_id())
502                .select_only()
503                .column(source::Column::WithProperties)
504                .into_tuple::<Property>()
505                .one(txn)
506                .await
507                .ok()
508                .flatten()
509                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
510            _ => unreachable!(),
511        }
512    };
513    if let Some(connector_name) = connector_name {
514        report_event(
515            PbTelemetryEventStage::DropStreamJob,
516            "source",
517            object_id.as_raw_id() as _,
518            Some(connector_name),
519            Some(match object_type {
520                ObjectType::Source => PbTelemetryDatabaseObject::Source,
521                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
522                _ => unreachable!(),
523            }),
524            None,
525        );
526    }
527}