risingwave_connector/source/filesystem/opendal_source/
batch_posix_fs_source.rs1use std::path::Path;
16
17use anyhow::anyhow;
18use async_trait::async_trait;
19use risingwave_common::types::JsonbVal;
20use serde::{Deserialize, Serialize};
21
22use super::BatchPosixFsProperties;
23use crate::error::ConnectorResult;
24use crate::parser::ParserConfig;
25use crate::source::batch::BatchSourceSplit;
26use crate::source::{
27 BoxSourceChunkStream, Column, SourceContextRef, SourceEnumeratorContextRef, SplitEnumerator,
28 SplitId, SplitMetaData, SplitReader,
29};
30
31#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
37pub struct BatchPosixFsSplit {
38 pub file_path: String,
41 pub split_id: SplitId,
43 #[serde(skip)]
46 pub finished: bool,
47}
48
49impl SplitMetaData for BatchPosixFsSplit {
50 fn id(&self) -> SplitId {
51 self.split_id.clone()
52 }
53
54 fn restore_from_json(value: JsonbVal) -> ConnectorResult<Self> {
55 serde_json::from_value(value.take()).map_err(|e| anyhow!(e).into())
56 }
57
58 fn encode_to_json(&self) -> JsonbVal {
59 serde_json::to_value(self.clone()).unwrap().into()
60 }
61
62 fn update_offset(&mut self, _last_seen_offset: String) -> ConnectorResult<()> {
63 Ok(())
65 }
66}
67
68impl BatchSourceSplit for BatchPosixFsSplit {
69 fn finished(&self) -> bool {
70 self.finished
71 }
72
73 fn finish(&mut self) {
74 self.finished = true;
75 }
76
77 fn refresh(&mut self) {
78 self.finished = false;
79 }
80}
81
82impl BatchPosixFsSplit {
83 pub fn new(file_path: String, split_id: SplitId) -> Self {
84 Self {
85 file_path,
86 split_id,
87 finished: false,
88 }
89 }
90
91 pub fn mark_finished(&mut self) {
92 self.finished = true;
93 }
94}
95
96#[derive(Debug)]
98pub struct BatchPosixFsEnumerator {
99 properties: BatchPosixFsProperties,
100}
101
102#[async_trait]
103impl SplitEnumerator for BatchPosixFsEnumerator {
104 type Properties = BatchPosixFsProperties;
105 type Split = BatchPosixFsSplit;
106
107 async fn new(
108 properties: Self::Properties,
109 _context: SourceEnumeratorContextRef,
110 ) -> ConnectorResult<Self> {
111 Ok(Self { properties })
112 }
113
114 async fn list_splits(&mut self) -> ConnectorResult<Vec<BatchPosixFsSplit>> {
115 let root_path = Path::new(&self.properties.root);
117
118 if !root_path.exists() {
119 return Err(anyhow!("Root directory does not exist: {}", self.properties.root).into());
120 }
121
122 Ok(vec![BatchPosixFsSplit::new(
124 self.properties.root.clone(), "114514".into(), )])
127 }
128}
129
130#[derive(Debug)]
132pub struct BatchPosixFsReader {}
133
134#[async_trait]
135impl SplitReader for BatchPosixFsReader {
136 type Properties = BatchPosixFsProperties;
137 type Split = BatchPosixFsSplit;
138
139 async fn new(
140 _properties: Self::Properties,
141 _splits: Vec<Self::Split>,
142 _parser_config: ParserConfig,
143 _source_ctx: SourceContextRef,
144 _columns: Option<Vec<Column>>,
145 ) -> ConnectorResult<Self> {
146 return Err(anyhow!("BatchPosixFsReader should not be used").into());
147 }
148
149 fn into_stream(self) -> BoxSourceChunkStream {
150 unreachable!(
151 "BatchPosixFsReader should not hit this branch. refer to `batch_posix_fs_list.rs`, `batch_posix_fs_fetch.rs`"
152 );
153 }
154}