risingwave_batch_executors/executor/
delete.rs1use futures_async_stream::try_stream;
16use itertools::Itertools;
17use risingwave_common::array::{
18 Array, ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk,
19};
20use risingwave_common::catalog::{Field, Schema, TableId, TableVersionId};
21use risingwave_common::transaction::transaction_id::TxnId;
22use risingwave_common::types::DataType;
23use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
24use risingwave_dml::dml_manager::DmlManagerRef;
25use risingwave_pb::batch_plan::plan_node::NodeBody;
26
27use crate::error::{BatchError, Result};
28use crate::executor::{
29 BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
30};
31
32pub struct DeleteExecutor {
36 table_id: TableId,
38 table_version_id: TableVersionId,
39 dml_manager: DmlManagerRef,
40 child: BoxedExecutor,
41 chunk_size: usize,
42 schema: Schema,
43 identity: String,
44 returning: bool,
45 txn_id: TxnId,
46 session_id: u32,
47}
48
49impl DeleteExecutor {
50 pub fn new(
51 table_id: TableId,
52 table_version_id: TableVersionId,
53 dml_manager: DmlManagerRef,
54 child: BoxedExecutor,
55 chunk_size: usize,
56 identity: String,
57 returning: bool,
58 session_id: u32,
59 ) -> Self {
60 let table_schema = child.schema().clone();
61 let txn_id = dml_manager.gen_txn_id();
62 Self {
63 table_id,
64 table_version_id,
65 dml_manager,
66 child,
67 chunk_size,
68 schema: if returning {
69 table_schema
70 } else {
71 Schema {
72 fields: vec![Field::unnamed(DataType::Int64)],
73 }
74 },
75 identity,
76 returning,
77 txn_id,
78 session_id,
79 }
80 }
81}
82
83impl Executor for DeleteExecutor {
84 fn schema(&self) -> &Schema {
85 &self.schema
86 }
87
88 fn identity(&self) -> &str {
89 &self.identity
90 }
91
92 fn execute(self: Box<Self>) -> BoxedDataChunkStream {
93 self.do_execute()
94 }
95}
96
97impl DeleteExecutor {
98 #[try_stream(boxed, ok = DataChunk, error = BatchError)]
99 async fn do_execute(self: Box<Self>) {
100 let data_types = self.child.schema().data_types();
101 let mut builder = DataChunkBuilder::new(data_types, self.chunk_size);
102
103 let table_dml_handle = self
104 .dml_manager
105 .table_dml_handle(self.table_id, self.table_version_id)?;
106 assert_eq!(
107 table_dml_handle
108 .column_descs()
109 .iter()
110 .filter_map(|c| (!c.is_generated()).then_some(c.data_type.clone()))
111 .collect_vec(),
112 self.child.schema().data_types(),
113 "bad delete schema"
114 );
115 let mut write_handle = table_dml_handle.write_handle(self.session_id, self.txn_id)?;
116
117 write_handle.begin()?;
118
119 let write_txn_data = |chunk: DataChunk| async {
121 let cap = chunk.capacity();
122 let stream_chunk = StreamChunk::from_parts(vec![Op::Delete; cap], chunk);
123
124 #[cfg(debug_assertions)]
125 table_dml_handle.check_chunk_schema(&stream_chunk);
126
127 let cardinality = stream_chunk.cardinality();
128 write_handle.write_chunk(stream_chunk).await?;
129
130 Result::Ok(cardinality)
131 };
132
133 let mut rows_deleted = 0;
134
135 #[for_await]
136 for data_chunk in self.child.execute() {
137 let data_chunk = data_chunk?;
138 if self.returning {
139 yield data_chunk.clone();
140 }
141 for chunk in builder.append_chunk(data_chunk) {
142 rows_deleted += write_txn_data(chunk).await?;
143 }
144 }
145
146 if let Some(chunk) = builder.consume_all() {
147 rows_deleted += write_txn_data(chunk).await?;
148 }
149
150 write_handle.end().await?;
151
152 if !self.returning {
154 let mut array_builder = PrimitiveArrayBuilder::<i64>::new(1);
155 array_builder.append(Some(rows_deleted as i64));
156
157 let array = array_builder.finish();
158 let ret_chunk = DataChunk::new(vec![array.into_ref()], 1);
159
160 yield ret_chunk
161 }
162 }
163}
164
165impl BoxedExecutorBuilder for DeleteExecutor {
166 async fn new_boxed_executor(
167 source: &ExecutorBuilder<'_>,
168 inputs: Vec<BoxedExecutor>,
169 ) -> Result<BoxedExecutor> {
170 let [child]: [_; 1] = inputs.try_into().unwrap();
171 let delete_node = try_match_expand!(
172 source.plan_node().get_node_body().unwrap(),
173 NodeBody::Delete
174 )?;
175
176 let table_id = TableId::new(delete_node.table_id);
177
178 Ok(Box::new(Self::new(
179 table_id,
180 delete_node.table_version_id,
181 source.context().dml_manager(),
182 child,
183 source.context().get_config().developer.chunk_size,
184 source.plan_node().get_identity().clone(),
185 delete_node.returning,
186 delete_node.session_id,
187 )))
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use std::sync::Arc;
194
195 use futures::StreamExt;
196 use risingwave_common::catalog::{
197 ColumnDesc, ColumnId, INITIAL_TABLE_VERSION_ID, schema_test_utils,
198 };
199 use risingwave_common::test_prelude::DataChunkTestExt;
200 use risingwave_dml::dml_manager::DmlManager;
201
202 use super::*;
203 use crate::executor::test_utils::MockExecutor;
204 use crate::*;
205
206 #[tokio::test]
207 async fn test_delete_executor() -> Result<()> {
208 let dml_manager = Arc::new(DmlManager::for_test());
209
210 let schema = schema_test_utils::ii();
212 let mut mock_executor = MockExecutor::new(schema.clone());
213
214 let schema = schema_test_utils::ii();
216
217 mock_executor.add(DataChunk::from_pretty(
218 "i i
219 1 2
220 3 4
221 5 6
222 7 8
223 9 10",
224 ));
225
226 let table_id = TableId::new(0);
228 let column_descs = schema
229 .fields
230 .iter()
231 .enumerate()
232 .map(|(i, field)| ColumnDesc::unnamed(ColumnId::new(i as _), field.data_type.clone()))
233 .collect_vec();
234 let reader = dml_manager
237 .register_reader(table_id, INITIAL_TABLE_VERSION_ID, &column_descs)
238 .unwrap();
239 let mut reader = reader.stream_reader().into_stream();
240
241 let delete_executor = Box::new(DeleteExecutor::new(
243 table_id,
244 INITIAL_TABLE_VERSION_ID,
245 dml_manager,
246 Box::new(mock_executor),
247 1024,
248 "DeleteExecutor".to_owned(),
249 false,
250 0,
251 ));
252
253 let handle = tokio::spawn(async move {
254 let fields = &delete_executor.schema().fields;
255 assert_eq!(fields[0].data_type, DataType::Int64);
256
257 let mut stream = delete_executor.execute();
258 let result = stream.next().await.unwrap().unwrap();
259
260 assert_eq!(
261 result.column_at(0).as_int64().iter().collect::<Vec<_>>(),
262 vec![Some(5)] );
264 });
265
266 reader.next().await.unwrap()?.into_begin().unwrap();
268
269 let txn_msg = reader.next().await.unwrap()?;
270 let chunk = txn_msg.as_stream_chunk().unwrap();
271 assert_eq!(chunk.ops().to_vec(), vec![Op::Delete; 5]);
272
273 assert_eq!(
274 chunk.columns()[0].as_int32().iter().collect::<Vec<_>>(),
275 vec![Some(1), Some(3), Some(5), Some(7), Some(9)]
276 );
277
278 assert_eq!(
279 chunk.columns()[1].as_int32().iter().collect::<Vec<_>>(),
280 vec![Some(2), Some(4), Some(6), Some(8), Some(10)]
281 );
282
283 reader.next().await.unwrap()?.into_end().unwrap();
284
285 handle.await.unwrap();
286
287 Ok(())
288 }
289}