risingwave_connector/source/reader/
fs_reader.rs#![deprecated = "will be replaced by new fs source (list + fetch)"]
use std::sync::Arc;
use anyhow::Context;
use futures::stream::pending;
use futures::StreamExt;
use risingwave_common::catalog::ColumnId;
use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::{
create_split_reader, BoxChunkSourceStream, ConnectorProperties, ConnectorState,
SourceColumnDesc, SourceContext, SplitReader,
};
use crate::{dispatch_source_prop, WithOptionsSecResolved};
#[derive(Clone, Debug)]
pub struct FsSourceReader {
pub config: ConnectorProperties,
pub columns: Vec<SourceColumnDesc>,
pub properties: WithOptionsSecResolved,
pub parser_config: SpecificParserConfig,
}
impl FsSourceReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
properties: WithOptionsSecResolved,
columns: Vec<SourceColumnDesc>,
parser_config: SpecificParserConfig,
) -> ConnectorResult<Self> {
let config = ConnectorProperties::extract(properties.clone(), false)?;
Ok(Self {
config,
columns,
properties,
parser_config,
})
}
fn get_target_columns(
&self,
column_ids: Vec<ColumnId>,
) -> ConnectorResult<Vec<SourceColumnDesc>> {
column_ids
.iter()
.map(|id| {
self.columns
.iter()
.find(|c| c.column_id == *id)
.with_context(|| {
format!("Failed to find column id: {} in source: {:?}", id, self)
})
.cloned()
})
.try_collect()
.map_err(Into::into)
}
pub async fn to_stream(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<BoxChunkSourceStream> {
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;
let parser_config = ParserConfig {
specific: self.parser_config.clone(),
common: CommonParserConfig {
rw_columns: columns,
},
};
let stream = match state {
None => pending().boxed(),
Some(splits) => {
dispatch_source_prop!(config, prop, {
create_split_reader(*prop, splits, parser_config, source_ctx, None)
.await?
.into_stream()
})
}
};
Ok(stream)
}
}