risingwave_connector/parser/
csv_parser.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::cast::str_to_bool;
16use risingwave_common::types::{DataType, Date, Decimal, ScalarImpl, Time, Timestamp, Timestamptz};
17
18use super::unified::{AccessError, AccessResult};
19use super::{ByteStreamSourceParser, CsvProperties};
20use crate::error::ConnectorResult;
21use crate::only_parse_payload;
22use crate::parser::{ParserFormat, SourceStreamChunkRowWriter};
23use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef};
24
25macro_rules! parse {
26    ($v:ident, $t:ty) => {
27        $v.parse::<$t>().map_err(|_| AccessError::TypeError {
28            expected: stringify!($t).to_owned(),
29            got: "string".to_owned(),
30            value: $v.to_string(),
31        })
32    };
33}
34
35/// Parser for CSV format
36#[derive(Debug)]
37pub struct CsvParser {
38    rw_columns: Vec<SourceColumnDesc>,
39    source_ctx: SourceContextRef,
40    headers: Option<Vec<String>>,
41    delimiter: u8,
42}
43
44impl CsvParser {
45    pub fn new(
46        rw_columns: Vec<SourceColumnDesc>,
47        csv_props: CsvProperties,
48        source_ctx: SourceContextRef,
49    ) -> ConnectorResult<Self> {
50        let CsvProperties {
51            delimiter,
52            has_header,
53        } = csv_props;
54
55        Ok(Self {
56            rw_columns,
57            delimiter,
58            headers: if has_header { Some(Vec::new()) } else { None },
59            source_ctx,
60        })
61    }
62
63    fn read_row(&self, buf: &[u8]) -> ConnectorResult<Vec<String>> {
64        let mut reader_builder = csv::ReaderBuilder::default();
65        reader_builder.delimiter(self.delimiter).has_headers(false);
66        let record = reader_builder
67            .from_reader(buf)
68            .records()
69            .next()
70            .transpose()?;
71        Ok(record
72            .map(|record| record.iter().map(|field| field.to_owned()).collect())
73            .unwrap_or_default())
74    }
75
76    #[inline]
77    fn parse_string(dtype: &DataType, v: String) -> AccessResult {
78        let v = match dtype {
79            // mysql use tinyint to represent boolean
80            DataType::Boolean => {
81                str_to_bool(&v)
82                    .map(ScalarImpl::Bool)
83                    .map_err(|_| AccessError::TypeError {
84                        expected: "boolean".to_owned(),
85                        got: "string".to_owned(),
86                        value: v,
87                    })?
88            }
89            DataType::Int16 => parse!(v, i16)?.into(),
90            DataType::Int32 => parse!(v, i32)?.into(),
91            DataType::Int64 => parse!(v, i64)?.into(),
92            DataType::Float32 => parse!(v, f32)?.into(),
93            DataType::Float64 => parse!(v, f64)?.into(),
94            // FIXME: decimal should have more precision than f64
95            DataType::Decimal => parse!(v, Decimal)?.into(),
96            DataType::Varchar => v.into(),
97            DataType::Date => parse!(v, Date)?.into(),
98            DataType::Time => parse!(v, Time)?.into(),
99            DataType::Timestamp => parse!(v, Timestamp)?.into(),
100            DataType::Timestamptz => parse!(v, Timestamptz)?.into(),
101            _ => {
102                return Err(AccessError::UnsupportedType {
103                    ty: dtype.to_string(),
104                });
105            }
106        };
107        Ok(Some(v))
108    }
109
110    #[allow(clippy::unused_async)]
111    pub async fn parse_inner(
112        &mut self,
113        payload: Vec<u8>,
114        mut writer: SourceStreamChunkRowWriter<'_>,
115    ) -> ConnectorResult<()> {
116        let mut fields = self.read_row(&payload)?;
117
118        if let Some(headers) = &mut self.headers {
119            if headers.is_empty() {
120                *headers = fields;
121                // The header row does not output a row, so we return early.
122                return Ok(());
123            }
124            writer.do_insert(|desc| {
125                if let Some(i) = headers.iter().position(|name| name == &desc.name) {
126                    let value = fields.get_mut(i).map(std::mem::take).unwrap_or_default();
127                    if value.is_empty() {
128                        return Ok(None);
129                    }
130                    Self::parse_string(&desc.data_type, value)
131                } else {
132                    Ok(None)
133                }
134            })?;
135        } else {
136            fields.reverse();
137            writer.do_insert(|desc| {
138                if let Some(value) = fields.pop() {
139                    if value.is_empty() {
140                        return Ok(None);
141                    }
142                    Self::parse_string(&desc.data_type, value)
143                } else {
144                    Ok(None)
145                }
146            })?;
147        }
148
149        Ok(())
150    }
151}
152
153impl ByteStreamSourceParser for CsvParser {
154    fn columns(&self) -> &[SourceColumnDesc] {
155        &self.rw_columns
156    }
157
158    fn source_ctx(&self) -> &SourceContext {
159        &self.source_ctx
160    }
161
162    fn parser_format(&self) -> ParserFormat {
163        ParserFormat::Csv
164    }
165
166    async fn parse_one<'a>(
167        &'a mut self,
168        _key: Option<Vec<u8>>,
169        payload: Option<Vec<u8>>,
170        writer: SourceStreamChunkRowWriter<'a>,
171    ) -> ConnectorResult<()> {
172        only_parse_payload!(self, payload, writer)
173    }
174}
175
176#[cfg(test)]
177mod tests {
178    use risingwave_common::array::Op;
179    use risingwave_common::row::Row;
180    use risingwave_common::types::{DataType, ToOwnedDatum};
181
182    use super::*;
183    use crate::parser::SourceStreamChunkBuilder;
184    use crate::source::SourceCtrlOpts;
185
186    #[tokio::test]
187    async fn test_csv_without_headers() {
188        let data = vec![
189            r#"1,a,2"#,
190            r#""15541","a,1,1,",4"#,
191            r#"0,"""0",0"#,
192            r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#,
193            r#",,,,"#,
194        ];
195        let descs = vec![
196            SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
197            SourceColumnDesc::simple("b", DataType::Varchar, 1.into()),
198            SourceColumnDesc::simple("c", DataType::Int32, 2.into()),
199        ];
200        let mut parser = CsvParser::new(
201            Vec::new(),
202            CsvProperties {
203                delimiter: b',',
204                has_header: false,
205            },
206            SourceContext::dummy().into(),
207        )
208        .unwrap();
209        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
210        for item in data {
211            parser
212                .parse_inner(item.as_bytes().to_vec(), builder.row_writer())
213                .await
214                .unwrap();
215        }
216        builder.finish_current_chunk();
217        let chunk = builder.consume_ready_chunks().next().unwrap();
218        let mut rows = chunk.rows();
219        {
220            let (op, row) = rows.next().unwrap();
221            assert_eq!(op, Op::Insert);
222            assert_eq!(
223                row.datum_at(0).to_owned_datum(),
224                (Some(ScalarImpl::Int32(1)))
225            );
226            assert_eq!(
227                row.datum_at(1).to_owned_datum(),
228                (Some(ScalarImpl::Utf8("a".into())))
229            );
230            assert_eq!(
231                row.datum_at(2).to_owned_datum(),
232                (Some(ScalarImpl::Int32(2)))
233            );
234        }
235        {
236            let (op, row) = rows.next().unwrap();
237            assert_eq!(op, Op::Insert);
238            assert_eq!(
239                row.datum_at(0).to_owned_datum(),
240                (Some(ScalarImpl::Int32(15541)))
241            );
242            assert_eq!(
243                row.datum_at(1).to_owned_datum(),
244                (Some(ScalarImpl::Utf8("a,1,1,".into())))
245            );
246            assert_eq!(
247                row.datum_at(2).to_owned_datum(),
248                (Some(ScalarImpl::Int32(4)))
249            );
250        }
251
252        {
253            let (op, row) = rows.next().unwrap();
254            assert_eq!(op, Op::Insert);
255            assert_eq!(
256                row.datum_at(0).to_owned_datum(),
257                (Some(ScalarImpl::Int32(0)))
258            );
259            assert_eq!(
260                row.datum_at(1).to_owned_datum(),
261                (Some(ScalarImpl::Utf8("\"0".into())))
262            );
263            assert_eq!(
264                row.datum_at(2).to_owned_datum(),
265                (Some(ScalarImpl::Int32(0)))
266            );
267        }
268
269        {
270            let (op, row) = rows.next().unwrap();
271            assert_eq!(op, Op::Insert);
272            assert_eq!(
273                row.datum_at(0).to_owned_datum(),
274                (Some(ScalarImpl::Int32(0)))
275            );
276            assert_eq!(
277                row.datum_at(1).to_owned_datum(),
278                (Some(ScalarImpl::Utf8("0".into())))
279            );
280            assert_eq!(
281                row.datum_at(2).to_owned_datum(),
282                (Some(ScalarImpl::Int32(0)))
283            );
284        }
285
286        {
287            let (op, row) = rows.next().unwrap();
288            assert_eq!(op, Op::Insert);
289            assert_eq!(row.datum_at(0), None);
290            assert_eq!(row.datum_at(1), None);
291            assert_eq!(row.datum_at(2), None);
292        }
293    }
294    #[tokio::test]
295    async fn test_csv_with_headers() {
296        let data = [
297            r#"c,b,a"#,
298            r#"1,a,2"#,
299            r#""15541","a,1,1,",4"#,
300            r#"0,"""0",0"#,
301            r#"0,0,0,0,0,0,0,0,0,0,0,0,0,"#,
302        ];
303        let descs = vec![
304            SourceColumnDesc::simple("a", DataType::Int32, 0.into()),
305            SourceColumnDesc::simple("b", DataType::Varchar, 1.into()),
306            SourceColumnDesc::simple("c", DataType::Int32, 2.into()),
307        ];
308        let mut parser = CsvParser::new(
309            Vec::new(),
310            CsvProperties {
311                delimiter: b',',
312                has_header: true,
313            },
314            SourceContext::dummy().into(),
315        )
316        .unwrap();
317        let mut builder = SourceStreamChunkBuilder::new(descs, SourceCtrlOpts::for_test());
318        for item in data {
319            let _ = parser
320                .parse_inner(item.as_bytes().to_vec(), builder.row_writer())
321                .await;
322        }
323        builder.finish_current_chunk();
324        let chunk = builder.consume_ready_chunks().next().unwrap();
325        let mut rows = chunk.rows();
326        {
327            let (op, row) = rows.next().unwrap();
328            assert_eq!(op, Op::Insert);
329            assert_eq!(
330                row.datum_at(2).to_owned_datum(),
331                (Some(ScalarImpl::Int32(1)))
332            );
333            assert_eq!(
334                row.datum_at(1).to_owned_datum(),
335                (Some(ScalarImpl::Utf8("a".into())))
336            );
337            assert_eq!(
338                row.datum_at(0).to_owned_datum(),
339                (Some(ScalarImpl::Int32(2)))
340            );
341        }
342        {
343            let (op, row) = rows.next().unwrap();
344            assert_eq!(op, Op::Insert);
345            assert_eq!(
346                row.datum_at(2).to_owned_datum(),
347                (Some(ScalarImpl::Int32(15541)))
348            );
349            assert_eq!(
350                row.datum_at(1).to_owned_datum(),
351                (Some(ScalarImpl::Utf8("a,1,1,".into())))
352            );
353            assert_eq!(
354                row.datum_at(0).to_owned_datum(),
355                (Some(ScalarImpl::Int32(4)))
356            );
357        }
358
359        {
360            let (op, row) = rows.next().unwrap();
361            assert_eq!(op, Op::Insert);
362            assert_eq!(
363                row.datum_at(2).to_owned_datum(),
364                (Some(ScalarImpl::Int32(0)))
365            );
366            assert_eq!(
367                row.datum_at(1).to_owned_datum(),
368                (Some(ScalarImpl::Utf8("\"0".into())))
369            );
370            assert_eq!(
371                row.datum_at(0).to_owned_datum(),
372                (Some(ScalarImpl::Int32(0)))
373            );
374        }
375
376        {
377            let (op, row) = rows.next().unwrap();
378            assert_eq!(op, Op::Insert);
379            assert_eq!(
380                row.datum_at(2).to_owned_datum(),
381                (Some(ScalarImpl::Int32(0)))
382            );
383            assert_eq!(
384                row.datum_at(1).to_owned_datum(),
385                (Some(ScalarImpl::Utf8("0".into())))
386            );
387            assert_eq!(
388                row.datum_at(0).to_owned_datum(),
389                (Some(ScalarImpl::Int32(0)))
390            );
391        }
392    }
393
394    #[test]
395    fn test_parse_boolean() {
396        assert_eq!(
397            CsvParser::parse_string(&DataType::Boolean, "1".to_owned()).unwrap(),
398            Some(true.into())
399        );
400        assert_eq!(
401            CsvParser::parse_string(&DataType::Boolean, "t".to_owned()).unwrap(),
402            Some(true.into())
403        );
404        assert_eq!(
405            CsvParser::parse_string(&DataType::Boolean, "T".to_owned()).unwrap(),
406            Some(true.into())
407        );
408        assert_eq!(
409            CsvParser::parse_string(&DataType::Boolean, "true".to_owned()).unwrap(),
410            Some(true.into())
411        );
412        assert_eq!(
413            CsvParser::parse_string(&DataType::Boolean, "TRUE".to_owned()).unwrap(),
414            Some(true.into())
415        );
416        assert_eq!(
417            CsvParser::parse_string(&DataType::Boolean, "True".to_owned()).unwrap(),
418            Some(true.into())
419        );
420
421        assert_eq!(
422            CsvParser::parse_string(&DataType::Boolean, "0".to_owned()).unwrap(),
423            Some(false.into())
424        );
425        assert_eq!(
426            CsvParser::parse_string(&DataType::Boolean, "f".to_owned()).unwrap(),
427            Some(false.into())
428        );
429        assert_eq!(
430            CsvParser::parse_string(&DataType::Boolean, "F".to_owned()).unwrap(),
431            Some(false.into())
432        );
433        assert_eq!(
434            CsvParser::parse_string(&DataType::Boolean, "false".to_owned()).unwrap(),
435            Some(false.into())
436        );
437        assert_eq!(
438            CsvParser::parse_string(&DataType::Boolean, "FALSE".to_owned()).unwrap(),
439            Some(false.into())
440        );
441        assert_eq!(
442            CsvParser::parse_string(&DataType::Boolean, "False".to_owned()).unwrap(),
443            Some(false.into())
444        );
445
446        assert!(CsvParser::parse_string(&DataType::Boolean, "2".to_owned()).is_err());
447        assert!(CsvParser::parse_string(&DataType::Boolean, "t1".to_owned()).is_err());
448        assert!(CsvParser::parse_string(&DataType::Boolean, "f1".to_owned()).is_err());
449        assert!(CsvParser::parse_string(&DataType::Boolean, "false1".to_owned()).is_err());
450        assert!(CsvParser::parse_string(&DataType::Boolean, "TRUE1".to_owned()).is_err());
451    }
452}