risingwave_connector/source/reader/
fs_reader.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deprecated = "will be replaced by new fs source (list + fetch)"]
16
17use std::sync::Arc;
18
19use anyhow::Context;
20use futures::StreamExt;
21use futures::stream::pending;
22use risingwave_common::catalog::ColumnId;
23
24use crate::WithOptionsSecResolved;
25use crate::error::ConnectorResult;
26use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
27use crate::source::{
28    BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
29};
30
31#[derive(Clone, Debug)]
32pub struct LegacyFsSourceReader {
33    pub config: ConnectorProperties,
34    pub columns: Vec<SourceColumnDesc>,
35    pub properties: WithOptionsSecResolved,
36    pub parser_config: SpecificParserConfig,
37}
38
39impl LegacyFsSourceReader {
40    pub fn new(
41        properties: WithOptionsSecResolved,
42        columns: Vec<SourceColumnDesc>,
43        parser_config: SpecificParserConfig,
44    ) -> ConnectorResult<Self> {
45        // Store the connector node address to properties for later use.
46        let config = ConnectorProperties::extract(properties.clone(), false)?;
47
48        Ok(Self {
49            config,
50            columns,
51            properties,
52            parser_config,
53        })
54    }
55
56    fn get_target_columns(
57        &self,
58        column_ids: Vec<ColumnId>,
59    ) -> ConnectorResult<Vec<SourceColumnDesc>> {
60        column_ids
61            .iter()
62            .map(|id| {
63                self.columns
64                    .iter()
65                    .find(|c| c.column_id == *id)
66                    .with_context(|| {
67                        format!("Failed to find column id: {} in source: {:?}", id, self)
68                    })
69                    .cloned()
70            })
71            .try_collect()
72            .map_err(Into::into)
73    }
74
75    pub async fn to_stream(
76        &self,
77        state: ConnectorState,
78        column_ids: Vec<ColumnId>,
79        source_ctx: Arc<SourceContext>,
80    ) -> ConnectorResult<BoxSourceChunkStream> {
81        let config = self.config.clone();
82        let columns = self.get_target_columns(column_ids)?;
83
84        let parser_config = ParserConfig {
85            specific: self.parser_config.clone(),
86            common: CommonParserConfig {
87                rw_columns: columns,
88            },
89        };
90        let stream = match state {
91            None => pending().boxed(),
92            Some(splits) => {
93                config
94                    .create_split_reader(
95                        splits,
96                        parser_config,
97                        source_ctx,
98                        None,
99                        Default::default(),
100                    )
101                    .await?
102                    .0
103            }
104        };
105        Ok(stream)
106    }
107}