risingwave_batch_executors/executor/
delete.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::{
20    Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
21};
22use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
23use risingwave_common::transaction::transaction_id::TxnId;
24use risingwave_common::types::DataType;
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_dml::dml_manager::DmlManagerRef;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31    BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34/// [`DeleteExecutor`] implements table deletion with values from its child executor.
35// Note: multiple `DELETE`s in a single epoch, or concurrent `DELETE`s may lead to conflicting
36// records. This is validated and filtered on the first `Materialize`.
37pub struct DeleteExecutor {
38    /// Target table id.
39    table_id: TableId,
40    table_version_id: TableVersionId,
41    pk_indices: Vec<usize>,
42    dml_manager: DmlManagerRef,
43    child: BoxedExecutor,
44    chunk_size: usize,
45    schema: Schema,
46    identity: String,
47    returning: bool,
48    txn_id: TxnId,
49    session_id: u32,
50    upsert: bool,
51}
52
53impl DeleteExecutor {
54    pub fn new(
55        table_id: TableId,
56        table_version_id: TableVersionId,
57        pk_indices: Vec<usize>,
58        dml_manager: DmlManagerRef,
59        child: BoxedExecutor,
60        chunk_size: usize,
61        identity: String,
62        returning: bool,
63        session_id: u32,
64        upsert: bool,
65    ) -> Self {
66        let table_schema = child.schema().clone();
67        let txn_id = dml_manager.gen_txn_id();
68        Self {
69            table_id,
70            table_version_id,
71            pk_indices,
72            dml_manager,
73            child,
74            chunk_size,
75            schema: if returning {
76                table_schema
77            } else {
78                Schema {
79                    fields: vec![Field::unnamed(DataType::Int64)],
80                }
81            },
82            identity,
83            returning,
84            txn_id,
85            session_id,
86            upsert,
87        }
88    }
89}
90
91impl Executor for DeleteExecutor {
92    fn schema(&self) -> &Schema {
93        &self.schema
94    }
95
96    fn identity(&self) -> &str {
97        &self.identity
98    }
99
100    fn execute(self: Box<Self>) -> BoxedDataChunkStream {
101        self.do_execute()
102    }
103}
104
105impl DeleteExecutor {
106    #[try_stream(boxed, ok = DataChunk, error = BatchError)]
107    async fn do_execute(self: Box<Self>) {
108        let pk_indices: HashSet<_> = self.pk_indices.into_iter().collect();
109        let data_types = self.child.schema().data_types();
110        let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
111
112        let table_dml_handle = self
113            .dml_manager
114            .table_dml_handle(self.table_id, self.table_version_id)?;
115        assert_eq!(
116            table_dml_handle
117                .column_descs()
118                .iter()
119                .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
120                .collect_vec(),
121            self.child.schema().data_types(),
122            "bad delete schema"
123        );
124        let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
125
126        write_handle.begin()?;
127
128        // Transform the data chunk to a stream chunk, then write to the source.
129        let write_txn_data = |chunk: DataChunk| async {
130            let cap = chunk.capacity();
131            let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
132
133            #[cfg(debug_assertions)]
134            table_dml_handle.check_chunk_schema(&stream_chunk);
135
136            let cardinality = stream_chunk.cardinality();
137            write_handle.write_chunk(stream_chunk).await?;
138
139            Result::Ok(cardinality)
140        };
141
142        let mut rows_deleted = 0;
143
144        #[for_await]
145        for data_chunk in self.child.execute() {
146            let mut data_chunk = data_chunk?;
147            if self.returning {
148                yield data_chunk.clone();
149            }
150            if self.upsert {
151                let (cols, vis) = data_chunk.into_parts();
152                let cap = vis.len();
153                let mut new_cols = Vec::with_capacity(cols.len());
154                // Only keep the primary key columns, pad the rest with null.
155                for (i, col) in cols.into_iter().enumerate() {
156                    if pk_indices.contains(&i) {
157                        new_cols.push(col);
158                    } else {
159                        let mut builder = col.create_builder(cap);
160                        builder.append_n_null(cap);
161                        new_cols.push(builder.finish().into());
162                    }
163                }
164                data_chunk = DataChunk::new(new_cols, vis);
165            }
166            for chunk in builder.append_chunk(data_chunk) {
167                rows_deleted += write_txn_data(chunk).await?;
168            }
169        }
170
171        if let Some(chunk) = builder.consume_all() {
172            rows_deleted += write_txn_data(chunk).await?;
173        }
174
175        write_handle.end().await?;
176
177        // create ret value
178        if !self.returning {
179            let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
180            array_builder.append(Some(rows_deleted as i64));
181
182            let array = array_builder.finish();
183            let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
184
185            yield ret_chunk
186        }
187    }
188}
189
190impl BoxedExecutorBuilder for DeleteExecutor {
191    async fn new_boxed_executor(
192        source: &ExecutorBuilder<'_>,
193        inputs: Vec<BoxedExecutor>,
194    ) -> Result<BoxedExecutor> {
195        let [child]: [_; 1] = inputs.try_into().unwrap();
196        let delete_node = try_match_expand!(
197            source.plan_node().get_node_body().unwrap(),
198            NodeBody::Delete
199        )?;
200
201        let table_id = delete_node.table_id;
202
203        Ok(Box::new(Self::new(
204            table_id,
205            delete_node.table_version_id,
206            delete_node
207                .pk_indices
208                .iter()
209                .map(|&idx| idx as usize)
210                .collect(),
211            source.context().dml_manager(),
212            child,
213            source.context().get_config().developer.chunk_size,
214            source.plan_node().get_identity().clone(),
215            delete_node.returning,
216            delete_node.session_id,
217            delete_node.upsert,
218        )))
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use std::sync::Arc;
225
226    use futures::StreamExt;
227    use risingwave_common::catalog::{
228        ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
229    };
230    use risingwave_common::test_prelude::DataChunkTestExt;
231    use risingwave_dml::dml_manager::DmlManager;
232
233    use super::*;
234    use crate::executor::test_utils::MockExecutor;
235    use crate::*;
236
237    #[tokio::test]
238    async fn test_delete_executor() -> Result<()> {
239        let dml_manager = Arc::new(DmlManager::for_test());
240
241        // Schema for mock executor.
242        let schema = schema_test_utils::ii();
243        let mut mock_executor = MockExecutor::new(schema.clone());
244
245        // Schema of the table
246        let schema = schema_test_utils::ii();
247
248        mock_executor.add(DataChunk::from_pretty(
249            "i  i
250             1  2
251             3  4
252             5  6
253             7  8
254             9 10",
255        ));
256
257        // Create reader
258        let table_id = TableId::new(0);
259        let column_descs = schema
260            .fields
261            .iter()
262            .enumerate()
263            .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
264            .collect_vec();
265        // We must create a variable to hold this `Arc<TableDmlHandle>` here, or it will be dropped
266        // due to the `Weak` reference in `DmlManager`.
267        let reader = dml_manager
268            .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
269            .unwrap();
270        let mut reader = reader.stream_reader().into_stream();
271
272        // Delete
273        let delete_executor = Box::new(DeleteExecutor::new(
274            table_id,
275            INITIAL_TABLE_VERSION_ID,
276            vec![0],
277            dml_manager,
278            Box::new(mock_executor),
279            1024,
280            "DeleteExecutor".to_owned(),
281            false,
282            0,
283            false,
284        ));
285
286        let handle = tokio::spawn(async move {
287            let fields = &delete_executor.schema().fields;
288            assert_eq!(fields[0].data_type, DataType::Int64);
289
290            let mut stream = delete_executor.execute();
291            let result = stream.next().await.unwrap().unwrap();
292
293            assert_eq!(
294                result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
295                vec![Some(5)] // deleted rows
296            );
297        });
298
299        // Read
300        reader.next().await.unwrap()?.into_begin().unwrap();
301
302        let txn_msg = reader.next().await.unwrap()?;
303        let chunk = txn_msg.as_stream_chunk().unwrap();
304        assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
305
306        assert_eq!(
307            chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
308            vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
309        );
310
311        assert_eq!(
312            chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
313            vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
314        );
315
316        reader.next().await.unwrap()?.into_end().unwrap();
317
318        handle.await.unwrap();
319
320        Ok(())
321    }
322}