1use risingwave_common::cast::str_to_bool;
16use risingwave_common::types::{DataType, Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
17
18use super::unified::{AccessError, AccessResult};
19use super::{ByteStreamSourceParser, CsvProperties};
20use crate::error::ConnectorResult;
21use crate::only_parse_payload;
22use crate::parser::{ParserFormat, SourceStreamChunkRowWriter};
23use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
24
25macro_rules! parse {
26 ($v:ident, $t:ty) => {
27 $v.parse::<$t>().map_err(|_| AccessError::TypeError {
28 expected: stringify!($t).to_owned(),
29 got: "string".to_owned(),
30 value: $v.to_string(),
31 })
32 };
33}
34
35#[derive(Debug)]
37pub struct CsvParser {
38 rw_columns: Vec<SourceColumnDesc>,
39 source_ctx: SourceContextRef,
40 headers: Option<Vec<String>>,
41 delimiter: u8,
42}
43
44impl CsvParser {
45 pub fn new(
46 rw_columns: Vec<SourceColumnDesc>,
47 csv_props: CsvProperties,
48 source_ctx: SourceContextRef,
49 ) -> ConnectorResult<Self> {
50 let CsvProperties {
51 delimiter,
52 has_header,
53 } = csv_props;
54
55 Ok(Self {
56 rw_columns,
57 delimiter,
58 headers: if has_header { Some(Vec::new()) } else { None },
59 source_ctx,
60 })
61 }
62
63 fn read_row(&self, buf: &[u8]) -> ConnectorResult<Vec<String>> {
64 let mut reader_builder = csv::ReaderBuilder::default();
65 reader_builder.delimiter(self.delimiter).has_headers(false);
66 let record = reader_builder
67 .from_reader(buf)
68 .records()
69 .next()
70 .transpose()?;
71 Ok(record
72 .map(|record| record.iter().map(|field| field.to_owned()).collect())
73 .unwrap_or_default())
74 }
75
76 #[inline]
77 fn parse_string(dtype: &DataType, v: String) -> AccessResult {
78 let v = match dtype {
79 DataType::Boolean => {
81 str_to_bool(&v)
82 .map(ScalarImpl::Bool)
83 .map_err(|_| AccessError::TypeError {
84 expected: "boolean".to_owned(),
85 got: "string".to_owned(),
86 value: v,
87 })?
88 }
89 DataType::Int16 => parse!(v, i16)?.into(),
90 DataType::Int32 => parse!(v, i32)?.into(),
91 DataType::Int64 => parse!(v, i64)?.into(),
92 DataType::Float32 => parse!(v, f32)?.into(),
93 DataType::Float64 => parse!(v, f64)?.into(),
94 DataType::Decimal => parse!(v, Decimal)?.into(),
96 DataType::Varchar => v.into(),
97 DataType::Date => parse!(v, Date)?.into(),
98 DataType::Time => parse!(v, Time)?.into(),
99 DataType::Timestamp => parse!(v, Timestamp)?.into(),
100 DataType::Timestamptz => parse!(v, Timestamptz)?.into(),
101 _ => {
102 return Err(AccessError::UnsupportedType {
103 ty: dtype.to_string(),
104 });
105 }
106 };
107 Ok(Some(v))
108 }
109
110 #[allow(clippy::unused_async)]
111 pub async fn parse_inner(
112 &mut self,
113 payload: Vec<u8>,
114 mut writer: SourceStreamChunkRowWriter<'_>,
115 ) -> ConnectorResult<()> {
116 let mut fields = self.read_row(&payload)?;
117
118 if let Some(headers) = &mut self.headers {
119 if headers.is_empty() {
120 *headers = fields;
121 return Ok(());
123 }
124 writer.do_insert(|desc| {
125 if let Some(i) = headers.iter().position(|name| name == &desc.name) {
126 let value = fields.get_mut(i).map(std::mem::take).unwrap_or_default();
127 if value.is_empty() {
128 return Ok(None);
129 }
130 Self::parse_string(&desc.data_type, value)
131 } else {
132 Ok(None)
133 }
134 })?;
135 } else {
136 fields.reverse();
137 writer.do_insert(|desc| {
138 if let Some(value) = fields.pop() {
139 if value.is_empty() {
140 return Ok(None);
141 }
142 Self::parse_string(&desc.data_type, value)
143 } else {
144 Ok(None)
145 }
146 })?;
147 }
148
149 Ok(())
150 }
151}
152
153impl ByteStreamSourceParser for CsvParser {
154 fn columns(&self) -> &[SourceColumnDesc] {
155 &self.rw_columns
156 }
157
158 fn source_ctx(&self) -> &SourceContext {
159 &self.source_ctx
160 }
161
162 fn parser_format(&self) -> ParserFormat {
163 ParserFormat::Csv
164 }
165
166 async fn parse_one<'a>(
167 &'a mut self,
168 _key: Option<Vec<u8>>,
169 payload: Option<Vec<u8>>,
170 writer: SourceStreamChunkRowWriter<'a>,
171 ) -> ConnectorResult<()> {
172 only_parse_payload!(self, payload, writer)
173 }
174}
175
176#[cfg(test)]
177mod tests {
178 use risingwave_common::array::Op;
179 use risingwave_common::row::Row;
180 use risingwave_common::types::{DataType, ToOwnedDatum};
181
182 use super::*;
183 use crate::parser::SourceStreamChunkBuilder;
184 use crate::source::SourceCtrlOpts;
185
186 #[tokio::test]
187 async fn test_csv_without_headers() {
188 let data = vec![
189 r#"1,a,2"#,
190 r#""15541","a,1,1,",4"#,
191 r#"0,"""0",0"#,
192 r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#,
193 r#",,,,"#,
194 ];
195 let descs = vec![
196 SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
197 SourceColumnDesc::simple("b", DataType::Varchar, 1.into()),
198 SourceColumnDesc::simple("c", DataType::Int32, 2.into()),
199 ];
200 let mut parser = CsvParser::new(
201 Vec::new(),
202 CsvProperties {
203 delimiter: b',',
204 has_header: false,
205 },
206 SourceContext::dummy().into(),
207 )
208 .unwrap();
209 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
210 for item in data {
211 parser
212 .parse_inner(item.as_bytes().to_vec(), builder.row_writer())
213 .await
214 .unwrap();
215 }
216 builder.finish_current_chunk();
217 let chunk = builder.consume_ready_chunks().next().unwrap();
218 let mut rows = chunk.rows();
219 {
220 let (op, row) = rows.next().unwrap();
221 assert_eq!(op, Op::Insert);
222 assert_eq!(
223 row.datum_at(0).to_owned_datum(),
224 (Some(ScalarImpl::Int32(1)))
225 );
226 assert_eq!(
227 row.datum_at(1).to_owned_datum(),
228 (Some(ScalarImpl::Utf8("a".into())))
229 );
230 assert_eq!(
231 row.datum_at(2).to_owned_datum(),
232 (Some(ScalarImpl::Int32(2)))
233 );
234 }
235 {
236 let (op, row) = rows.next().unwrap();
237 assert_eq!(op, Op::Insert);
238 assert_eq!(
239 row.datum_at(0).to_owned_datum(),
240 (Some(ScalarImpl::Int32(15541)))
241 );
242 assert_eq!(
243 row.datum_at(1).to_owned_datum(),
244 (Some(ScalarImpl::Utf8("a,1,1,".into())))
245 );
246 assert_eq!(
247 row.datum_at(2).to_owned_datum(),
248 (Some(ScalarImpl::Int32(4)))
249 );
250 }
251
252 {
253 let (op, row) = rows.next().unwrap();
254 assert_eq!(op, Op::Insert);
255 assert_eq!(
256 row.datum_at(0).to_owned_datum(),
257 (Some(ScalarImpl::Int32(0)))
258 );
259 assert_eq!(
260 row.datum_at(1).to_owned_datum(),
261 (Some(ScalarImpl::Utf8("\"0".into())))
262 );
263 assert_eq!(
264 row.datum_at(2).to_owned_datum(),
265 (Some(ScalarImpl::Int32(0)))
266 );
267 }
268
269 {
270 let (op, row) = rows.next().unwrap();
271 assert_eq!(op, Op::Insert);
272 assert_eq!(
273 row.datum_at(0).to_owned_datum(),
274 (Some(ScalarImpl::Int32(0)))
275 );
276 assert_eq!(
277 row.datum_at(1).to_owned_datum(),
278 (Some(ScalarImpl::Utf8("0".into())))
279 );
280 assert_eq!(
281 row.datum_at(2).to_owned_datum(),
282 (Some(ScalarImpl::Int32(0)))
283 );
284 }
285
286 {
287 let (op, row) = rows.next().unwrap();
288 assert_eq!(op, Op::Insert);
289 assert_eq!(row.datum_at(0), None);
290 assert_eq!(row.datum_at(1), None);
291 assert_eq!(row.datum_at(2), None);
292 }
293 }
294 #[tokio::test]
295 async fn test_csv_with_headers() {
296 let data = [
297 r#"c,b,a"#,
298 r#"1,a,2"#,
299 r#""15541","a,1,1,",4"#,
300 r#"0,"""0",0"#,
301 r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#,
302 ];
303 let descs = vec![
304 SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
305 SourceColumnDesc::simple("b", DataType::Varchar, 1.into()),
306 SourceColumnDesc::simple("c", DataType::Int32, 2.into()),
307 ];
308 let mut parser = CsvParser::new(
309 Vec::new(),
310 CsvProperties {
311 delimiter: b',',
312 has_header: true,
313 },
314 SourceContext::dummy().into(),
315 )
316 .unwrap();
317 let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
318 for item in data {
319 let _ = parser
320 .parse_inner(item.as_bytes().to_vec(), builder.row_writer())
321 .await;
322 }
323 builder.finish_current_chunk();
324 let chunk = builder.consume_ready_chunks().next().unwrap();
325 let mut rows = chunk.rows();
326 {
327 let (op, row) = rows.next().unwrap();
328 assert_eq!(op, Op::Insert);
329 assert_eq!(
330 row.datum_at(2).to_owned_datum(),
331 (Some(ScalarImpl::Int32(1)))
332 );
333 assert_eq!(
334 row.datum_at(1).to_owned_datum(),
335 (Some(ScalarImpl::Utf8("a".into())))
336 );
337 assert_eq!(
338 row.datum_at(0).to_owned_datum(),
339 (Some(ScalarImpl::Int32(2)))
340 );
341 }
342 {
343 let (op, row) = rows.next().unwrap();
344 assert_eq!(op, Op::Insert);
345 assert_eq!(
346 row.datum_at(2).to_owned_datum(),
347 (Some(ScalarImpl::Int32(15541)))
348 );
349 assert_eq!(
350 row.datum_at(1).to_owned_datum(),
351 (Some(ScalarImpl::Utf8("a,1,1,".into())))
352 );
353 assert_eq!(
354 row.datum_at(0).to_owned_datum(),
355 (Some(ScalarImpl::Int32(4)))
356 );
357 }
358
359 {
360 let (op, row) = rows.next().unwrap();
361 assert_eq!(op, Op::Insert);
362 assert_eq!(
363 row.datum_at(2).to_owned_datum(),
364 (Some(ScalarImpl::Int32(0)))
365 );
366 assert_eq!(
367 row.datum_at(1).to_owned_datum(),
368 (Some(ScalarImpl::Utf8("\"0".into())))
369 );
370 assert_eq!(
371 row.datum_at(0).to_owned_datum(),
372 (Some(ScalarImpl::Int32(0)))
373 );
374 }
375
376 {
377 let (op, row) = rows.next().unwrap();
378 assert_eq!(op, Op::Insert);
379 assert_eq!(
380 row.datum_at(2).to_owned_datum(),
381 (Some(ScalarImpl::Int32(0)))
382 );
383 assert_eq!(
384 row.datum_at(1).to_owned_datum(),
385 (Some(ScalarImpl::Utf8("0".into())))
386 );
387 assert_eq!(
388 row.datum_at(0).to_owned_datum(),
389 (Some(ScalarImpl::Int32(0)))
390 );
391 }
392 }
393
394 #[test]
395 fn test_parse_boolean() {
396 assert_eq!(
397 CsvParser::parse_string(&DataType::Boolean, "1".to_owned()).unwrap(),
398 Some(true.into())
399 );
400 assert_eq!(
401 CsvParser::parse_string(&DataType::Boolean, "t".to_owned()).unwrap(),
402 Some(true.into())
403 );
404 assert_eq!(
405 CsvParser::parse_string(&DataType::Boolean, "T".to_owned()).unwrap(),
406 Some(true.into())
407 );
408 assert_eq!(
409 CsvParser::parse_string(&DataType::Boolean, "true".to_owned()).unwrap(),
410 Some(true.into())
411 );
412 assert_eq!(
413 CsvParser::parse_string(&DataType::Boolean, "TRUE".to_owned()).unwrap(),
414 Some(true.into())
415 );
416 assert_eq!(
417 CsvParser::parse_string(&DataType::Boolean, "True".to_owned()).unwrap(),
418 Some(true.into())
419 );
420
421 assert_eq!(
422 CsvParser::parse_string(&DataType::Boolean, "0".to_owned()).unwrap(),
423 Some(false.into())
424 );
425 assert_eq!(
426 CsvParser::parse_string(&DataType::Boolean, "f".to_owned()).unwrap(),
427 Some(false.into())
428 );
429 assert_eq!(
430 CsvParser::parse_string(&DataType::Boolean, "F".to_owned()).unwrap(),
431 Some(false.into())
432 );
433 assert_eq!(
434 CsvParser::parse_string(&DataType::Boolean, "false".to_owned()).unwrap(),
435 Some(false.into())
436 );
437 assert_eq!(
438 CsvParser::parse_string(&DataType::Boolean, "FALSE".to_owned()).unwrap(),
439 Some(false.into())
440 );
441 assert_eq!(
442 CsvParser::parse_string(&DataType::Boolean, "False".to_owned()).unwrap(),
443 Some(false.into())
444 );
445
446 assert!(CsvParser::parse_string(&DataType::Boolean, "2".to_owned()).is_err());
447 assert!(CsvParser::parse_string(&DataType::Boolean, "t1".to_owned()).is_err());
448 assert!(CsvParser::parse_string(&DataType::Boolean, "f1".to_owned()).is_err());
449 assert!(CsvParser::parse_string(&DataType::Boolean, "false1".to_owned()).is_err());
450 assert!(CsvParser::parse_string(&DataType::Boolean, "TRUE1".to_owned()).is_err());
451 }
452}