risingwave_connector/source/filesystem/
file_common.rs1use std::fmt::Debug;
16use std::hash::Hash;
17use std::marker::PhantomData;
18
19use aws_sdk_s3::types::Object;
20use risingwave_common::types::{JsonbVal, Timestamptz};
21use serde::{Deserialize, Serialize};
22use strum::Display;
23
24use super::opendal_source::OpendalSource;
25use crate::error::ConnectorResult;
26use crate::source::{SplitId, SplitMetaData};
27
28#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
31pub struct OpendalFsSplit<Src: OpendalSource> {
32 pub name: String,
33 pub offset: usize,
34 pub size: usize,
36 _marker: PhantomData<Src>,
37}
38
39impl<Src: OpendalSource> From<&Object> for OpendalFsSplit<Src> {
40 fn from(value: &Object) -> Self {
41 Self {
42 name: value.key().unwrap().to_owned(),
43 offset: 0,
44 size: value.size().unwrap_or_default() as usize,
45 _marker: PhantomData,
46 }
47 }
48}
49
50impl<Src: OpendalSource> SplitMetaData for OpendalFsSplit<Src> {
51 fn id(&self) -> SplitId {
52 self.name.as_str().into()
53 }
54
55 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
56 serde_json::from_value(value.take()).map_err(Into::into)
57 }
58
59 fn encode_to_json(&self) -> JsonbVal {
60 serde_json::to_value(self.clone()).unwrap().into()
61 }
62
63 fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
64 let offset = last_seen_offset.parse().unwrap();
65 self.offset = offset;
66 Ok(())
67 }
68}
69
70impl<Src: OpendalSource> OpendalFsSplit<Src> {
71 pub fn new(name: String, start: usize, size: usize) -> Self {
72 Self {
73 name,
74 offset: start,
75 size,
76 _marker: PhantomData,
77 }
78 }
79
80 pub fn empty_split() -> Self {
81 Self {
82 name: "empty_split".to_owned(),
83 offset: 0,
84 size: 0,
85 _marker: PhantomData,
86 }
87 }
88}
89
90#[derive(Clone, Debug)]
91pub struct FsPageItem {
92 pub name: String,
93 pub size: i64,
94 pub timestamp: Timestamptz,
95}
96
97pub type FsPage = Vec<FsPageItem>;
98
99#[derive(Debug, Default, Clone, PartialEq, Display, Deserialize)]
100pub enum CompressionFormat {
101 #[default]
102 None,
103
104 #[serde(rename = "gzip", alias = "gz")]
105 Gzip,
106}