risingwave_connector/source/reader/
fs_reader.rs1#![deprecated = "will be replaced by new fs source (list + fetch)"]
16
17use std::sync::Arc;
18
19use anyhow::Context;
20use futures::StreamExt;
21use futures::stream::pending;
22use risingwave_common::catalog::ColumnId;
23
24use crate::WithOptionsSecResolved;
25use crate::error::ConnectorResult;
26use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
27use crate::source::{
28 BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
29};
30
31#[derive(Clone, Debug)]
32pub struct LegacyFsSourceReader {
33 pub config: ConnectorProperties,
34 pub columns: Vec<SourceColumnDesc>,
35 pub properties: WithOptionsSecResolved,
36 pub parser_config: SpecificParserConfig,
37}
38
39impl LegacyFsSourceReader {
40 pub fn new(
41 properties: WithOptionsSecResolved,
42 columns: Vec<SourceColumnDesc>,
43 parser_config: SpecificParserConfig,
44 ) -> ConnectorResult<Self> {
45 let config = ConnectorProperties::extract(properties.clone(), false)?;
47
48 Ok(Self {
49 config,
50 columns,
51 properties,
52 parser_config,
53 })
54 }
55
56 fn get_target_columns(
57 &self,
58 column_ids: Vec<ColumnId>,
59 ) -> ConnectorResult<Vec<SourceColumnDesc>> {
60 column_ids
61 .iter()
62 .map(|id| {
63 self.columns
64 .iter()
65 .find(|c| c.column_id == *id)
66 .with_context(|| {
67 format!("Failed to find column id: {} in source: {:?}", id, self)
68 })
69 .cloned()
70 })
71 .try_collect()
72 .map_err(Into::into)
73 }
74
75 pub async fn to_stream(
76 &self,
77 state: ConnectorState,
78 column_ids: Vec<ColumnId>,
79 source_ctx: Arc<SourceContext>,
80 ) -> ConnectorResult<BoxSourceChunkStream> {
81 let config = self.config.clone();
82 let columns = self.get_target_columns(column_ids)?;
83
84 let parser_config = ParserConfig {
85 specific: self.parser_config.clone(),
86 common: CommonParserConfig {
87 rw_columns: columns,
88 },
89 };
90 let stream = match state {
91 None => pending().boxed(),
92 Some(splits) => {
93 config
94 .create_split_reader(
95 splits,
96 parser_config,
97 source_ctx,
98 None,
99 Default::default(),
100 )
101 .await?
102 .0
103 }
104 };
105 Ok(stream)
106 }
107}