risingwave_connector/parser/
bytes_parser.rsuse risingwave_common::try_match_expand;
use super::unified::bytes::BytesAccess;
use super::unified::AccessImpl;
use super::{AccessBuilder, EncodingProperties};
use crate::error::ConnectorResult;
#[derive(Debug)]
pub struct BytesAccessBuilder {
column_name: Option<String>,
}
impl AccessBuilder for BytesAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_>> {
Ok(AccessImpl::Bytes(BytesAccess::new(
&self.column_name,
payload,
)))
}
}
impl BytesAccessBuilder {
pub fn new(encoding_properties: EncodingProperties) -> ConnectorResult<Self> {
let config = try_match_expand!(encoding_properties, EncodingProperties::Bytes)?;
Ok(Self {
column_name: config.column_name,
})
}
}
#[cfg(test)]
mod tests {
use risingwave_common::array::Op;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, ScalarImpl, ToOwnedDatum};
use crate::parser::plain_parser::PlainParser;
use crate::parser::{
BytesProperties, EncodingProperties, ProtocolProperties, SourceColumnDesc,
SourceStreamChunkBuilder, SpecificParserConfig,
};
use crate::source::SourceContext;
fn get_payload() -> Vec<Vec<u8>> {
vec![br#"t"#.to_vec(), br#"random"#.to_vec()]
}
async fn test_bytes_parser(get_payload: fn() -> Vec<Vec<u8>>) {
let descs = vec![SourceColumnDesc::simple("id", DataType::Bytea, 0.into())];
let props = SpecificParserConfig {
encoding_config: EncodingProperties::Bytes(BytesProperties { column_name: None }),
protocol_config: ProtocolProperties::Plain,
};
let mut parser = PlainParser::new(props, descs.clone(), SourceContext::dummy().into())
.await
.unwrap();
let mut builder = SourceStreamChunkBuilder::with_capacity(descs, 2);
for payload in get_payload() {
let writer = builder.row_writer();
parser
.parse_inner(None, Some(payload), writer)
.await
.unwrap();
}
let chunk = builder.finish();
let mut rows = chunk.rows();
{
let (op, row) = rows.next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(
row.datum_at(0).to_owned_datum(),
Some(ScalarImpl::Bytea("t".as_bytes().into()))
);
}
{
let (op, row) = rows.next().unwrap();
assert_eq!(op, Op::Insert);
assert_eq!(
row.datum_at(0).to_owned_datum(),
Some(ScalarImpl::Bytea("random".as_bytes().into()))
);
}
}
#[tokio::test]
async fn test_bytes_parse_object_top_level() {
test_bytes_parser(get_payload).await;
}
}