risingwave_meta/controller/catalog/
drop_op.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::{ICEBERG_SINK_PREFIX, ICEBERG_SOURCE_PREFIX};
16use risingwave_pb::catalog::PbTable;
17use risingwave_pb::catalog::subscription::PbSubscriptionState;
18use risingwave_pb::telemetry::PbTelemetryDatabaseObject;
19use sea_orm::{ColumnTrait, DatabaseTransaction, EntityTrait, ModelTrait, QueryFilter};
20
21use super::*;
22impl CatalogController {
23    // Drop all kinds of objects including databases,
24    // schemas, relations, connections, functions, etc.
25    pub async fn drop_object(
26        &self,
27        object_type: ObjectType,
28        object_id: impl Into<ObjectId>,
29        drop_mode: DropMode,
30    ) -> MetaResult<(ReleaseContext, NotificationVersion)> {
31        let object_id = object_id.into();
32        let mut inner = self.inner.write().await;
33        let txn = inner.db.begin().await?;
34
35        let obj: PartialObject = Object::find_by_id(object_id)
36            .into_partial_model()
37            .one(&txn)
38            .await?
39            .ok_or_else(|| MetaError::catalog_id_not_found(object_type.as_str(), object_id))?;
40        assert_eq!(obj.obj_type, object_type);
41        let drop_database = object_type == ObjectType::Database;
42        let database_id = if object_type == ObjectType::Database {
43            object_id.as_database_id()
44        } else {
45            obj.database_id
46                .ok_or_else(|| anyhow!("dropped object should have database_id"))?
47        };
48
49        // Check the cross-db dependency info to see if the subscription can be dropped.
50        if obj.obj_type == ObjectType::Subscription {
51            validate_subscription_deletion(&txn, object_id.as_subscription_id()).await?;
52        }
53
54        let mut removed_objects = match drop_mode {
55            DropMode::Cascade => get_referring_objects_cascade(object_id, &txn).await?,
56            DropMode::Restrict => match object_type {
57                ObjectType::Database => unreachable!("database always be dropped in cascade mode"),
58                ObjectType::Schema => {
59                    ensure_schema_empty(object_id.as_schema_id(), &txn).await?;
60                    Default::default()
61                }
62                ObjectType::Table => {
63                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
64                    let objects = get_referring_objects(object_id, &txn).await?;
65                    for obj in objects.iter().filter(|object| {
66                        object.obj_type == ObjectType::Source || object.obj_type == ObjectType::Sink
67                    }) {
68                        report_drop_object(obj.obj_type, obj.oid, &txn).await;
69                    }
70                    assert!(
71                        objects.iter().all(|obj| obj.obj_type == ObjectType::Index
72                            || obj.obj_type == ObjectType::Sink),
73                        "only index and iceberg sink could be dropped in restrict mode"
74                    );
75                    for obj in &objects {
76                        check_object_refer_for_drop(obj.obj_type, obj.oid, &txn).await?;
77                    }
78                    objects
79                }
80                object_type @ (ObjectType::Source | ObjectType::Sink) => {
81                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
82                    report_drop_object(object_type, object_id, &txn).await;
83                    vec![]
84                }
85
86                ObjectType::View
87                | ObjectType::Index
88                | ObjectType::Function
89                | ObjectType::Connection
90                | ObjectType::Subscription
91                | ObjectType::Secret => {
92                    check_object_refer_for_drop(object_type, object_id, &txn).await?;
93                    vec![]
94                }
95            },
96        };
97
98        // check iceberg source.
99        if obj.obj_type == ObjectType::Table {
100            let table_name = Table::find_by_id(object_id.as_table_id())
101                .select_only()
102                .column(table::Column::Name)
103                .into_tuple::<String>()
104                .one(&txn)
105                .await?
106                .ok_or_else(|| MetaError::catalog_id_not_found("table", object_id))?;
107            let iceberg_source = Source::find()
108                .inner_join(Object)
109                .filter(
110                    object::Column::DatabaseId
111                        .eq(database_id)
112                        .and(object::Column::SchemaId.eq(obj.schema_id.unwrap()))
113                        .and(
114                            source::Column::Name
115                                .eq(format!("{}{}", ICEBERG_SOURCE_PREFIX, table_name)),
116                        ),
117                )
118                .into_partial_model()
119                .one(&txn)
120                .await?;
121            if let Some(iceberg_source) = iceberg_source {
122                removed_objects.push(iceberg_source);
123            }
124        }
125
126        removed_objects.push(obj);
127        let mut removed_object_ids: HashSet<_> =
128            removed_objects.iter().map(|obj| obj.oid).collect();
129
130        // TODO: record dependency info in object_dependency table for sink into table.
131        // Special handling for 'sink into table'.
132        let incoming_sink_ids: Vec<SinkId> = Sink::find()
133            .select_only()
134            .column(sink::Column::SinkId)
135            .filter(sink::Column::TargetTable.is_in(removed_object_ids.clone()))
136            .into_tuple()
137            .all(&txn)
138            .await?;
139        if !incoming_sink_ids.is_empty() {
140            if self.env.opts.protect_drop_table_with_incoming_sink {
141                let sink_names: Vec<String> = Sink::find()
142                    .select_only()
143                    .column(sink::Column::Name)
144                    .filter(sink::Column::SinkId.is_in(incoming_sink_ids.clone()))
145                    .into_tuple()
146                    .all(&txn)
147                    .await?;
148
149                return Err(MetaError::permission_denied(format!(
150                    "Table used by incoming sinks: {:?}, please drop them manually",
151                    sink_names
152                )));
153            }
154
155            let removed_sink_objs: Vec<PartialObject> = Object::find()
156                .filter(object::Column::Oid.is_in(incoming_sink_ids))
157                .into_partial_model()
158                .all(&txn)
159                .await?;
160
161            removed_object_ids.extend(removed_sink_objs.iter().map(|obj| obj.oid));
162            removed_objects.extend(removed_sink_objs);
163        }
164
165        for obj in &removed_objects {
166            if obj.obj_type == ObjectType::Sink {
167                let sink = Sink::find_by_id(obj.oid.as_sink_id())
168                    .one(&txn)
169                    .await?
170                    .ok_or_else(|| MetaError::catalog_id_not_found("sink", obj.oid))?;
171
172                if let Some(target_table) = sink.target_table
173                    && !removed_object_ids.contains(&target_table.as_object_id())
174                    && !has_table_been_migrated(&txn, target_table).await?
175                {
176                    return Err(anyhow::anyhow!(
177                        "Dropping sink into table is not allowed for unmigrated table {}. Please migrate it first.",
178                        target_table
179                    ).into());
180                }
181            }
182        }
183
184        // 1. Detect when an Iceberg table is part of the dependencies.
185        // 2. Drop database with iceberg tables in it is not supported.
186        if object_type != ObjectType::Table || drop_database {
187            for obj in &removed_objects {
188                // if the obj is iceberg engine table, bail out
189                if obj.obj_type == ObjectType::Table {
190                    let table = Table::find_by_id(obj.oid.as_table_id())
191                        .one(&txn)
192                        .await?
193                        .ok_or_else(|| MetaError::catalog_id_not_found("table", obj.oid))?;
194                    if matches!(table.engine, Some(table::Engine::Iceberg)) {
195                        return Err(MetaError::permission_denied(format!(
196                            "Found iceberg table in dependency: {}, please drop it manually",
197                            table.name,
198                        )));
199                    }
200                }
201            }
202        }
203
204        let removed_table_ids = removed_objects
205            .iter()
206            .filter(|obj| obj.obj_type == ObjectType::Table || obj.obj_type == ObjectType::Index)
207            .map(|obj| obj.oid.as_table_id());
208
209        let removed_iceberg_table_sinks: Vec<PbSink> = Sink::find()
210            .find_also_related(Object)
211            .filter(
212                sink::Column::SinkId
213                    .is_in(removed_object_ids.clone())
214                    .and(sink::Column::Name.like(format!("{}%", ICEBERG_SINK_PREFIX))),
215            )
216            .all(&txn)
217            .await?
218            .into_iter()
219            .map(|(sink, obj)| ObjectModel(sink, obj.unwrap(), None).into())
220            .collect();
221
222        let removed_streaming_job_ids: Vec<JobId> = StreamingJob::find()
223            .select_only()
224            .column(streaming_job::Column::JobId)
225            .filter(streaming_job::Column::JobId.is_in(removed_object_ids))
226            .into_tuple()
227            .all(&txn)
228            .await?;
229
230        // Check if there are any streaming jobs that are creating.
231        if !removed_streaming_job_ids.is_empty() {
232            let creating = StreamingJob::find()
233                .filter(
234                    streaming_job::Column::JobStatus
235                        .ne(JobStatus::Created)
236                        .and(streaming_job::Column::JobId.is_in(removed_streaming_job_ids.clone())),
237                )
238                .count(&txn)
239                .await?;
240            if creating != 0 {
241                if creating == 1 && object_type == ObjectType::Sink {
242                    info!("dropping creating sink job, it will be cancelled");
243                } else {
244                    return Err(MetaError::permission_denied(format!(
245                        "can not drop {creating} creating streaming job, please cancel them firstly"
246                    )));
247                }
248            }
249        }
250
251        let mut removed_state_table_ids: HashSet<_> = removed_table_ids.clone().collect();
252
253        if !drop_database {
254            // Add associated sources.
255            let removed_source_ids: Vec<SourceId> = Table::find()
256                .select_only()
257                .column(table::Column::OptionalAssociatedSourceId)
258                .filter(
259                    table::Column::TableId
260                        .is_in(removed_table_ids)
261                        .and(table::Column::OptionalAssociatedSourceId.is_not_null()),
262                )
263                .into_tuple()
264                .all(&txn)
265                .await?;
266            let removed_source_objs: Vec<PartialObject> = Object::find()
267                .filter(object::Column::Oid.is_in(removed_source_ids))
268                .into_partial_model()
269                .all(&txn)
270                .await?;
271            removed_objects.extend(removed_source_objs);
272        }
273
274        let removed_source_ids: HashSet<_> = removed_objects
275            .iter()
276            .filter(|obj| obj.obj_type == ObjectType::Source)
277            .map(|obj| obj.oid.as_source_id())
278            .collect();
279
280        let removed_secret_ids = removed_objects
281            .iter()
282            .filter(|obj| obj.obj_type == ObjectType::Secret)
283            .map(|obj| obj.oid.as_secret_id())
284            .collect();
285
286        if !removed_streaming_job_ids.is_empty() {
287            let removed_internal_table_objs: Vec<PartialObject> = Object::find()
288                .select_only()
289                .columns([
290                    object::Column::Oid,
291                    object::Column::ObjType,
292                    object::Column::SchemaId,
293                    object::Column::DatabaseId,
294                ])
295                .join(JoinType::InnerJoin, object::Relation::Table.def())
296                .filter(table::Column::BelongsToJobId.is_in(removed_streaming_job_ids.clone()))
297                .into_partial_model()
298                .all(&txn)
299                .await?;
300
301            removed_state_table_ids.extend(
302                removed_internal_table_objs
303                    .iter()
304                    .map(|obj| obj.oid.as_table_id()),
305            );
306            removed_objects.extend(removed_internal_table_objs);
307        }
308
309        let removed_objects: HashMap<_, _> = removed_objects
310            .into_iter()
311            .map(|obj| (obj.oid, obj))
312            .collect();
313
314        // TODO: Support drop cascade for cross-database query.
315        for obj in removed_objects.values() {
316            if let Some(obj_database_id) = obj.database_id
317                && obj_database_id != database_id
318            {
319                return Err(MetaError::permission_denied(format!(
320                    "Referenced by other objects in database {obj_database_id}, please drop them manually"
321                )));
322            }
323        }
324
325        let (removed_source_fragments, removed_sink_fragments, removed_fragments) =
326            get_fragments_for_jobs(&txn, removed_streaming_job_ids.clone()).await?;
327
328        let sink_target_fragments = fetch_target_fragments(&txn, removed_sink_fragments).await?;
329        let mut removed_sink_fragment_by_targets = HashMap::new();
330        for (sink_fragment, target_fragments) in sink_target_fragments {
331            assert!(
332                target_fragments.len() <= 1,
333                "sink should have at most one downstream fragment"
334            );
335            if let Some(target_fragment) = target_fragments.first()
336                && !removed_fragments.contains(target_fragment)
337            {
338                removed_sink_fragment_by_targets
339                    .entry(*target_fragment)
340                    .or_insert_with(Vec::new)
341                    .push(sink_fragment);
342            }
343        }
344
345        // Find affect users with privileges on all this objects.
346        let updated_user_ids: Vec<UserId> = UserPrivilege::find()
347            .select_only()
348            .distinct()
349            .column(user_privilege::Column::UserId)
350            .filter(user_privilege::Column::Oid.is_in(removed_objects.keys().cloned()))
351            .into_tuple()
352            .all(&txn)
353            .await?;
354        let dropped_tables = Table::find()
355            .find_also_related(Object)
356            .filter(
357                table::Column::TableId.is_in(
358                    removed_state_table_ids
359                        .iter()
360                        .copied()
361                        .collect::<HashSet<TableId>>(),
362                ),
363            )
364            .all(&txn)
365            .await?
366            .into_iter()
367            .map(|(table, obj)| PbTable::from(ObjectModel(table, obj.unwrap(), None)));
368        // delete all in to_drop_objects.
369        let res = Object::delete_many()
370            .filter(object::Column::Oid.is_in(removed_objects.keys().cloned()))
371            .exec(&txn)
372            .await?;
373        if res.rows_affected == 0 {
374            return Err(MetaError::catalog_id_not_found(
375                object_type.as_str(),
376                object_id,
377            ));
378        }
379        let user_infos = list_user_info_by_ids(updated_user_ids, &txn).await?;
380
381        txn.commit().await?;
382
383        // notify about them.
384        self.notify_users_update(user_infos).await;
385        inner
386            .dropped_tables
387            .extend(dropped_tables.map(|t| (t.id, t)));
388
389        let version = match object_type {
390            ObjectType::Database => {
391                // TODO: Notify objects in other databases when the cross-database query is supported.
392                self.notify_frontend(
393                    NotificationOperation::Delete,
394                    NotificationInfo::Database(PbDatabase {
395                        id: database_id,
396                        ..Default::default()
397                    }),
398                )
399                .await
400            }
401            ObjectType::Schema => {
402                let (schema_obj, mut to_notify_objs): (Vec<_>, Vec<_>) = removed_objects
403                    .into_values()
404                    .partition(|obj| obj.obj_type == ObjectType::Schema && obj.oid == object_id);
405                let schema_obj = schema_obj
406                    .into_iter()
407                    .exactly_one()
408                    .expect("schema object not found");
409                to_notify_objs.push(schema_obj);
410
411                let relation_group = build_object_group_for_delete(to_notify_objs);
412                self.notify_frontend(NotificationOperation::Delete, relation_group)
413                    .await
414            }
415            _ => {
416                // Hummock observers and compactor observers are notified once the corresponding barrier is completed.
417                // They only need RelationInfo::Table.
418                let relation_group =
419                    build_object_group_for_delete(removed_objects.into_values().collect());
420                self.notify_frontend(NotificationOperation::Delete, relation_group)
421                    .await
422            }
423        };
424
425        Ok((
426            ReleaseContext {
427                database_id,
428                removed_streaming_job_ids,
429                removed_state_table_ids: removed_state_table_ids.into_iter().collect(),
430                removed_source_ids: removed_source_ids.into_iter().collect(),
431                removed_secret_ids,
432                removed_source_fragments,
433                removed_fragments,
434                removed_sink_fragment_by_targets,
435                removed_iceberg_table_sinks,
436            },
437            version,
438        ))
439    }
440
441    pub async fn try_abort_creating_subscription(
442        &self,
443        subscription_id: SubscriptionId,
444    ) -> MetaResult<()> {
445        let inner = self.inner.write().await;
446        let txn = inner.db.begin().await?;
447
448        let subscription = Subscription::find_by_id(subscription_id).one(&txn).await?;
449        let Some(subscription) = subscription else {
450            tracing::warn!(
451                %subscription_id,
452                "subscription not found when aborting creation, might be cleaned by recovery"
453            );
454            return Ok(());
455        };
456
457        if subscription.subscription_state == PbSubscriptionState::Created as i32 {
458            tracing::warn!(
459                %subscription_id,
460                "subscription is already created when aborting creation"
461            );
462            return Ok(());
463        }
464
465        subscription.delete(&txn).await?;
466        txn.commit().await?;
467        Ok(())
468    }
469}
470
471async fn report_drop_object(
472    object_type: ObjectType,
473    object_id: ObjectId,
474    txn: &DatabaseTransaction,
475) {
476    let connector_name = {
477        match object_type {
478            ObjectType::Sink => Sink::find_by_id(object_id.as_sink_id())
479                .select_only()
480                .column(sink::Column::Properties)
481                .into_tuple::<Property>()
482                .one(txn)
483                .await
484                .ok()
485                .flatten()
486                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
487            ObjectType::Source => Source::find_by_id(object_id.as_source_id())
488                .select_only()
489                .column(source::Column::WithProperties)
490                .into_tuple::<Property>()
491                .one(txn)
492                .await
493                .ok()
494                .flatten()
495                .and_then(|properties| properties.inner_ref().get("connector").cloned()),
496            _ => unreachable!(),
497        }
498    };
499    if let Some(connector_name) = connector_name {
500        report_event(
501            PbTelemetryEventStage::DropStreamJob,
502            "source",
503            object_id.as_raw_id() as _,
504            Some(connector_name),
505            Some(match object_type {
506                ObjectType::Source => PbTelemetryDatabaseObject::Source,
507                ObjectType::Sink => PbTelemetryDatabaseObject::Sink,
508                _ => unreachable!(),
509            }),
510            None,
511        );
512    }
513}