risingwave_connector/source/reader/
fs_reader.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![deprecated = "will be replaced by new fs source (list + fetch)"]
16
17use std::sync::Arc;
18
19use anyhow::Context;
20use futures::stream::pending;
21use futures::{StreamExt, TryStreamExt};
22use risingwave_common::catalog::ColumnId;
23
24use crate::WithOptionsSecResolved;
25use crate::error::ConnectorResult;
26use crate::parser::{CommonParserConfig, ParserConfig, SpecificParserConfig};
27use crate::source::{
28    BoxSourceChunkStream, ConnectorProperties, ConnectorState, SourceColumnDesc, SourceContext,
29    SourceReaderEvent,
30};
31
32#[derive(Clone, Debug)]
33pub struct LegacyFsSourceReader {
34    pub config: ConnectorProperties,
35    pub columns: Vec<SourceColumnDesc>,
36    pub properties: WithOptionsSecResolved,
37    pub parser_config: SpecificParserConfig,
38}
39
40impl LegacyFsSourceReader {
41    pub fn new(
42        properties: WithOptionsSecResolved,
43        columns: Vec<SourceColumnDesc>,
44        parser_config: SpecificParserConfig,
45    ) -> ConnectorResult<Self> {
46        // Store the connector node address to properties for later use.
47        let config = ConnectorProperties::extract(properties.clone(), false)?;
48
49        Ok(Self {
50            config,
51            columns,
52            properties,
53            parser_config,
54        })
55    }
56
57    fn get_target_columns(
58        &self,
59        column_ids: Vec<ColumnId>,
60    ) -> ConnectorResult<Vec<SourceColumnDesc>> {
61        column_ids
62            .iter()
63            .map(|id| {
64                self.columns
65                    .iter()
66                    .find(|c| c.column_id == *id)
67                    .with_context(|| {
68                        format!("Failed to find column id: {} in source: {:?}", id, self)
69                    })
70                    .cloned()
71            })
72            .try_collect()
73            .map_err(Into::into)
74    }
75
76    pub async fn to_stream(
77        &self,
78        state: ConnectorState,
79        column_ids: Vec<ColumnId>,
80        source_ctx: Arc<SourceContext>,
81    ) -> ConnectorResult<BoxSourceChunkStream> {
82        let config = self.config.clone();
83        let columns = self.get_target_columns(column_ids)?;
84
85        let parser_config = ParserConfig {
86            specific: self.parser_config.clone(),
87            common: CommonParserConfig {
88                rw_columns: columns,
89            },
90        };
91        let stream = match state {
92            None => pending().boxed(),
93            Some(splits) => config
94                .create_split_reader(splits, parser_config, source_ctx, None, Default::default())
95                .await?
96                .0
97                .try_filter_map(|event| async move {
98                    Ok(match event {
99                        SourceReaderEvent::DataChunk(chunk) => Some(chunk),
100                        SourceReaderEvent::SplitProgress(_) => None,
101                    })
102                })
103                .boxed(),
104        };
105        Ok(stream)
106    }
107}