risingwave_connector/source/filesystem/
file_common.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::hash::Hash;
17use std::marker::PhantomData;
18
19use aws_sdk_s3::types::Object;
20use risingwave_common::types::{JsonbVal, Timestamptz};
21use serde::{Deserialize, Serialize};
22use strum::Display;
23
24use super::opendal_source::OpendalSource;
25use crate::error::ConnectorResult;
26use crate::source::{SplitId, SplitMetaData};
27
28///  [`OpendalFsSplit`] Describes a file or a split of a file. A file is a generic concept,
29/// and can be a local file, a distributed file system, or am object in S3 bucket.
30#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
31pub struct OpendalFsSplit<Src: OpendalSource> {
32    pub name: String,
33    pub offset: usize,
34    // For Parquet encoding, the `size` represents the number of rows, while for other encodings, the `size` denotes the file size.
35    pub size: usize,
36    _marker: PhantomData<Src>,
37}
38
39impl<Src: OpendalSource> From<&Object> for OpendalFsSplit<Src> {
40    fn from(value: &Object) -> Self {
41        Self {
42            name: value.key().unwrap().to_owned(),
43            offset: 0,
44            size: value.size().unwrap_or_default() as usize,
45            _marker: PhantomData,
46        }
47    }
48}
49
50impl<Src: OpendalSource> SplitMetaData for OpendalFsSplit<Src> {
51    fn id(&self) -> SplitId {
52        self.name.as_str().into()
53    }
54
55    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
56        serde_json::from_value(value.take()).map_err(Into::into)
57    }
58
59    fn encode_to_json(&self) -> JsonbVal {
60        serde_json::to_value(self.clone()).unwrap().into()
61    }
62
63    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
64        let offset = last_seen_offset.parse().unwrap();
65        self.offset = offset;
66        Ok(())
67    }
68}
69
70impl<Src: OpendalSource> OpendalFsSplit<Src> {
71    pub fn new(name: String, start: usize, size: usize) -> Self {
72        Self {
73            name,
74            offset: start,
75            size,
76            _marker: PhantomData,
77        }
78    }
79
80    pub fn empty_split() -> Self {
81        Self {
82            name: "empty_split".to_owned(),
83            offset: 0,
84            size: 0,
85            _marker: PhantomData,
86        }
87    }
88}
89
90#[derive(Clone, Debug)]
91pub struct FsPageItem {
92    pub name: String,
93    pub size: i64,
94    pub timestamp: Timestamptz,
95}
96
97pub type FsPage = Vec<FsPageItem>;
98
99#[derive(Debug, Default, Clone, PartialEq, Display, Deserialize)]
100pub enum CompressionFormat {
101    #[default]
102    None,
103
104    #[serde(rename = "gzip", alias = "gz")]
105    Gzip,
106}