risingwave_connector/source/reader/
fs_reader.rs#![deprecated = "will be replaced by new fs source (list + fetch)"]
use std::sync::Arc;
use anyhow::Context;
use futures::stream::pending;
use futures::StreamExt;
use risingwave_common::catalog::ColumnId;
use crate::error::ConnectorResult;
use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
use crate::source::{
BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
};
use crate::WithOptionsSecResolved;
#[derive(Clone, Debug)]
pub struct LegacyFsSourceReader {
pub config: ConnectorProperties,
pub columns: Vec<SourceColumnDesc>,
pub properties: WithOptionsSecResolved,
pub parser_config: SpecificParserConfig,
}
impl LegacyFsSourceReader {
pub fn new(
properties: WithOptionsSecResolved,
columns: Vec<SourceColumnDesc>,
parser_config: SpecificParserConfig,
) -> ConnectorResult<Self> {
let config = ConnectorProperties::extract(properties.clone(), false)?;
Ok(Self {
config,
columns,
properties,
parser_config,
})
}
fn get_target_columns(
&self,
column_ids: Vec<ColumnId>,
) -> ConnectorResult<Vec<SourceColumnDesc>> {
column_ids
.iter()
.map(|id| {
self.columns
.iter()
.find(|c| c.column_id == *id)
.with_context(|| {
format!("Failed to find column id: {} in source: {:?}", id, self)
})
.cloned()
})
.try_collect()
.map_err(Into::into)
}
pub async fn to_stream(
&self,
state: ConnectorState,
column_ids: Vec<ColumnId>,
source_ctx: Arc<SourceContext>,
) -> ConnectorResult<BoxSourceChunkStream> {
let config = self.config.clone();
let columns = self.get_target_columns(column_ids)?;
let parser_config = ParserConfig {
specific: self.parser_config.clone(),
common: CommonParserConfig {
rw_columns: columns,
},
};
let stream = match state {
None => pending().boxed(),
Some(splits) => {
config
.create_split_reader(
splits,
parser_config,
source_ctx,
None,
Default::default(),
)
.await?
.0
}
};
Ok(stream)
}
}