risingwave_batch_executors/executor/
delete.rs1use std::collections::HashSet;
16
17use futures_async_stream::try_stream;
18use itertools::Itertools;
19use risingwave_common::array::{
20 Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
21};
22use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
23use risingwave_common::transaction::transaction_id::TxnId;
24use risingwave_common::types::DataType;
25use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
26use risingwave_dml::dml_manager::DmlManagerRef;
27use risingwave_pb::batch_plan::plan_node::NodeBody;
28
29use crate::error::{BatchError, Result};
30use crate::executor::{
31 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
32};
33
34pub struct DeleteExecutor {
38 table_id: TableId,
40 table_version_id: TableVersionId,
41 pk_indices: Vec<usize>,
42 dml_manager: DmlManagerRef,
43 child: BoxedExecutor,
44 chunk_size: usize,
45 schema: Schema,
46 identity: String,
47 returning: bool,
48 txn_id: TxnId,
49 session_id: u32,
50 upsert: bool,
51}
52
53impl DeleteExecutor {
54 pub fn new(
55 table_id: TableId,
56 table_version_id: TableVersionId,
57 pk_indices: Vec<usize>,
58 dml_manager: DmlManagerRef,
59 child: BoxedExecutor,
60 chunk_size: usize,
61 identity: String,
62 returning: bool,
63 session_id: u32,
64 upsert: bool,
65 ) -> Self {
66 let table_schema = child.schema().clone();
67 let txn_id = dml_manager.gen_txn_id();
68 Self {
69 table_id,
70 table_version_id,
71 pk_indices,
72 dml_manager,
73 child,
74 chunk_size,
75 schema: if returning {
76 table_schema
77 } else {
78 Schema {
79 fields: vec![Field::unnamed(DataType::Int64)],
80 }
81 },
82 identity,
83 returning,
84 txn_id,
85 session_id,
86 upsert,
87 }
88 }
89}
90
91impl Executor for DeleteExecutor {
92 fn schema(&self) -> &Schema {
93 &self.schema
94 }
95
96 fn identity(&self) -> &str {
97 &self.identity
98 }
99
100 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
101 self.do_execute()
102 }
103}
104
105impl DeleteExecutor {
106 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
107 async fn do_execute(self: Box<Self>) {
108 let pk_indices: HashSet<_> = self.pk_indices.into_iter().collect();
109 let data_types = self.child.schema().data_types();
110 let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
111
112 let table_dml_handle = self
113 .dml_manager
114 .table_dml_handle(self.table_id, self.table_version_id)?;
115 assert_eq!(
116 table_dml_handle
117 .column_descs()
118 .iter()
119 .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
120 .collect_vec(),
121 self.child.schema().data_types(),
122 "bad delete schema"
123 );
124 let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
125
126 write_handle.begin()?;
127
128 let write_txn_data = |chunk: DataChunk| async {
130 let cap = chunk.capacity();
131 let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
132
133 #[cfg(debug_assertions)]
134 table_dml_handle.check_chunk_schema(&stream_chunk);
135
136 let cardinality = stream_chunk.cardinality();
137 write_handle.write_chunk(stream_chunk).await?;
138
139 Result::Ok(cardinality)
140 };
141
142 let mut rows_deleted = 0;
143
144 #[for_await]
145 for data_chunk in self.child.execute() {
146 let mut data_chunk = data_chunk?;
147 if self.returning {
148 yield data_chunk.clone();
149 }
150 if self.upsert {
151 let (cols, vis) = data_chunk.into_parts();
152 let cap = vis.len();
153 let mut new_cols = Vec::with_capacity(cols.len());
154 for (i, col) in cols.into_iter().enumerate() {
156 if pk_indices.contains(&i) {
157 new_cols.push(col);
158 } else {
159 let mut builder = col.create_builder(cap);
160 builder.append_n_null(cap);
161 new_cols.push(builder.finish().into());
162 }
163 }
164 data_chunk = DataChunk::new(new_cols, vis);
165 }
166 for chunk in builder.append_chunk(data_chunk) {
167 rows_deleted += write_txn_data(chunk).await?;
168 }
169 }
170
171 if let Some(chunk) = builder.consume_all() {
172 rows_deleted += write_txn_data(chunk).await?;
173 }
174
175 write_handle.end().await?;
176
177 if !self.returning {
179 let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
180 array_builder.append(Some(rows_deleted as i64));
181
182 let array = array_builder.finish();
183 let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
184
185 yield ret_chunk
186 }
187 }
188}
189
190impl BoxedExecutorBuilder for DeleteExecutor {
191 async fn new_boxed_executor(
192 source: &ExecutorBuilder<'_>,
193 inputs: Vec<BoxedExecutor>,
194 ) -> Result<BoxedExecutor> {
195 let [child]: [_; 1] = inputs.try_into().unwrap();
196 let delete_node = try_match_expand!(
197 source.plan_node().get_node_body().unwrap(),
198 NodeBody::Delete
199 )?;
200
201 let table_id = delete_node.table_id;
202
203 Ok(Box::new(Self::new(
204 table_id,
205 delete_node.table_version_id,
206 delete_node
207 .pk_indices
208 .iter()
209 .map(|&idx| idx as usize)
210 .collect(),
211 source.context().dml_manager(),
212 child,
213 source.context().get_config().developer.chunk_size,
214 source.plan_node().get_identity().clone(),
215 delete_node.returning,
216 delete_node.session_id,
217 delete_node.upsert,
218 )))
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use std::sync::Arc;
225
226 use futures::StreamExt;
227 use risingwave_common::catalog::{
228 ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
229 };
230 use risingwave_common::test_prelude::DataChunkTestExt;
231 use risingwave_dml::dml_manager::DmlManager;
232
233 use super::*;
234 use crate::executor::test_utils::MockExecutor;
235 use crate::*;
236
237 #[tokio::test]
238 async fn test_delete_executor() -> Result<()> {
239 let dml_manager = Arc::new(DmlManager::for_test());
240
241 let schema = schema_test_utils::ii();
243 let mut mock_executor = MockExecutor::new(schema.clone());
244
245 let schema = schema_test_utils::ii();
247
248 mock_executor.add(DataChunk::from_pretty(
249 "i i
250 1 2
251 3 4
252 5 6
253 7 8
254 9 10",
255 ));
256
257 let table_id = TableId::new(0);
259 let column_descs = schema
260 .fields
261 .iter()
262 .enumerate()
263 .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
264 .collect_vec();
265 let reader = dml_manager
268 .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
269 .unwrap();
270 let mut reader = reader.stream_reader().into_stream();
271
272 let delete_executor = Box::new(DeleteExecutor::new(
274 table_id,
275 INITIAL_TABLE_VERSION_ID,
276 vec![0],
277 dml_manager,
278 Box::new(mock_executor),
279 1024,
280 "DeleteExecutor".to_owned(),
281 false,
282 0,
283 false,
284 ));
285
286 let handle = tokio::spawn(async move {
287 let fields = &delete_executor.schema().fields;
288 assert_eq!(fields[0].data_type, DataType::Int64);
289
290 let mut stream = delete_executor.execute();
291 let result = stream.next().await.unwrap().unwrap();
292
293 assert_eq!(
294 result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
295 vec![Some(5)] );
297 });
298
299 reader.next().await.unwrap()?.into_begin().unwrap();
301
302 let txn_msg = reader.next().await.unwrap()?;
303 let chunk = txn_msg.as_stream_chunk().unwrap();
304 assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
305
306 assert_eq!(
307 chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
308 vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
309 );
310
311 assert_eq!(
312 chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
313 vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
314 );
315
316 reader.next().await.unwrap()?.into_end().unwrap();
317
318 handle.await.unwrap();
319
320 Ok(())
321 }
322}