risingwave_connector/source/filesystem/
file_common.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::fmt::Debug;
15use std::hash::Hash;
16use std::marker::PhantomData;
17
18use aws_sdk_s3::types::Object;
19use risingwave_common::types::{JsonbVal, Timestamptz};
20use serde::{Deserialize, Serialize};
21use strum::Display;
22
23use super::opendal_source::OpendalSource;
24use crate::error::ConnectorResult;
25use crate::source::{SplitId, SplitMetaData};
26
27///  [`OpendalFsSplit`] Describes a file or a split of a file. A file is a generic concept,
28/// and can be a local file, a distributed file system, or am object in S3 bucket.
29#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
30pub struct OpendalFsSplit<Src: OpendalSource> {
31    pub name: String,
32    pub offset: usize,
33    // For Parquet encoding, the `size` represents the number of rows, while for other encodings, the `size` denotes the file size.
34    pub size: usize,
35    _marker: PhantomData<Src>,
36}
37
38impl<Src: OpendalSource> From<&Object> for OpendalFsSplit<Src> {
39    fn from(value: &Object) -> Self {
40        Self {
41            name: value.key().unwrap().to_owned(),
42            offset: 0,
43            size: value.size().unwrap_or_default() as usize,
44            _marker: PhantomData,
45        }
46    }
47}
48
49impl<Src: OpendalSource> SplitMetaData for OpendalFsSplit<Src> {
50    fn id(&self) -> SplitId {
51        self.name.as_str().into()
52    }
53
54    fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
55        serde_json::from_value(value.take()).map_err(Into::into)
56    }
57
58    fn encode_to_json(&self) -> JsonbVal {
59        serde_json::to_value(self.clone()).unwrap().into()
60    }
61
62    fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
63        let offset = last_seen_offset.parse().unwrap();
64        self.offset = offset;
65        Ok(())
66    }
67}
68
69impl<Src: OpendalSource> OpendalFsSplit<Src> {
70    pub fn new(name: String, start: usize, size: usize) -> Self {
71        Self {
72            name,
73            offset: start,
74            size,
75            _marker: PhantomData,
76        }
77    }
78
79    pub fn empty_split() -> Self {
80        Self {
81            name: "empty_split".to_owned(),
82            offset: 0,
83            size: 0,
84            _marker: PhantomData,
85        }
86    }
87}
88
89#[derive(Clone, Debug)]
90pub struct FsPageItem {
91    pub name: String,
92    pub size: i64,
93    pub timestamp: Timestamptz,
94}
95
96pub type FsPage = Vec<FsPageItem>;
97
98#[derive(Debug, Default, Clone, PartialEq, Display, Deserialize)]
99pub enum CompressionFormat {
100    #[default]
101    None,
102
103    #[serde(rename = "gzip", alias = "gz")]
104    Gzip,
105}