risingwave_connector/source/filesystem/
file_common.rs1use std::fmt::Debug;
15use std::hash::Hash;
16use std::marker::PhantomData;
17
18use aws_sdk_s3::types::Object;
19use risingwave_common::types::{JsonbVal, Timestamptz};
20use serde::{Deserialize, Serialize};
21use strum::Display;
22
23use super::opendal_source::OpendalSource;
24use crate::error::ConnectorResult;
25use crate::source::{SplitId, SplitMetaData};
26
27#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
30pub struct OpendalFsSplit<Src: OpendalSource> {
31 pub name: String,
32 pub offset: usize,
33 pub size: usize,
35 _marker: PhantomData<Src>,
36}
37
38impl<Src: OpendalSource> From<&Object> for OpendalFsSplit<Src> {
39 fn from(value: &Object) -> Self {
40 Self {
41 name: value.key().unwrap().to_owned(),
42 offset: 0,
43 size: value.size().unwrap_or_default() as usize,
44 _marker: PhantomData,
45 }
46 }
47}
48
49impl<Src: OpendalSource> SplitMetaData for OpendalFsSplit<Src> {
50 fn id(&self) -> SplitId {
51 self.name.as_str().into()
52 }
53
54 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
55 serde_json::from_value(value.take()).map_err(Into::into)
56 }
57
58 fn encode_to_json(&self) -> JsonbVal {
59 serde_json::to_value(self.clone()).unwrap().into()
60 }
61
62 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
63 let offset = last_seen_offset.parse().unwrap();
64 self.offset = offset;
65 Ok(())
66 }
67}
68
69impl<Src: OpendalSource> OpendalFsSplit<Src> {
70 pub fn new(name: String, start: usize, size: usize) -> Self {
71 Self {
72 name,
73 offset: start,
74 size,
75 _marker: PhantomData,
76 }
77 }
78
79 pub fn empty_split() -> Self {
80 Self {
81 name: "empty_split".to_owned(),
82 offset: 0,
83 size: 0,
84 _marker: PhantomData,
85 }
86 }
87}
88
89#[derive(Clone, Debug)]
90pub struct FsPageItem {
91 pub name: String,
92 pub size: i64,
93 pub timestamp: Timestamptz,
94}
95
96pub type FsPage = Vec<FsPageItem>;
97
98#[derive(Debug, Default, Clone, PartialEq, Display, Deserialize)]
99pub enum CompressionFormat {
100 #[default]
101 None,
102
103 #[serde(rename = "gzip", alias = "gz")]
104 Gzip,
105}