risingwave_connector/source/reader/
fs_reader.rs1#![deprecated = "will be replaced by new fs source (list + fetch)"]
16
17use std::sync::Arc;
18
19use anyhow::Context;
20use futures::stream::pending;
21use futures::{StreamExt, TryStreamExt};
22use risingwave_common::catalog::ColumnId;
23
24use crate::WithOptionsSecResolved;
25use crate::error::ConnectorResult;
26use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
27use crate::source::{
28 BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
29 SourceReaderEvent,
30};
31
32#[derive(Clone, Debug)]
33pub struct LegacyFsSourceReader {
34 pub config: ConnectorProperties,
35 pub columns: Vec<SourceColumnDesc>,
36 pub properties: WithOptionsSecResolved,
37 pub parser_config: SpecificParserConfig,
38}
39
40impl LegacyFsSourceReader {
41 pub fn new(
42 properties: WithOptionsSecResolved,
43 columns: Vec<SourceColumnDesc>,
44 parser_config: SpecificParserConfig,
45 ) -> ConnectorResult<Self> {
46 let config = ConnectorProperties::extract(properties.clone(), false)?;
48
49 Ok(Self {
50 config,
51 columns,
52 properties,
53 parser_config,
54 })
55 }
56
57 fn get_target_columns(
58 &self,
59 column_ids: Vec<ColumnId>,
60 ) -> ConnectorResult<Vec<SourceColumnDesc>> {
61 column_ids
62 .iter()
63 .map(|id| {
64 self.columns
65 .iter()
66 .find(|c| c.column_id == *id)
67 .with_context(|| {
68 format!("Failed to find column id: {} in source: {:?}", id, self)
69 })
70 .cloned()
71 })
72 .try_collect()
73 .map_err(Into::into)
74 }
75
76 pub async fn to_stream(
77 &self,
78 state: ConnectorState,
79 column_ids: Vec<ColumnId>,
80 source_ctx: Arc<SourceContext>,
81 ) -> ConnectorResult<BoxSourceChunkStream> {
82 let config = self.config.clone();
83 let columns = self.get_target_columns(column_ids)?;
84
85 let parser_config = ParserConfig {
86 specific: self.parser_config.clone(),
87 common: CommonParserConfig {
88 rw_columns: columns,
89 },
90 };
91 let stream = match state {
92 None => pending().boxed(),
93 Some(splits) => config
94 .create_split_reader(splits, parser_config, source_ctx, None, Default::default())
95 .await?
96 .0
97 .try_filter_map(|event| async move {
98 Ok(match event {
99 SourceReaderEvent::DataChunk(chunk) => Some(chunk),
100 SourceReaderEvent::SplitProgress(_) => None,
101 })
102 })
103 .boxed(),
104 };
105 Ok(stream)
106 }
107}