risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch.rs1use risingwave_common::catalog::Schema;
16use tonic::async_trait;
17
18use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
19use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
20use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
21use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
22use crate::enforce_secret::EnforceSecret;
23use crate::sink::Result;
24
25pub const ES_SINK: &str = "elasticsearch";
26
27#[derive(Debug)]
28pub struct ElasticSearchSink {
29 config: ElasticSearchOpenSearchConfig,
30 schema: Schema,
31 pk_indices: Vec<usize>,
32 is_append_only: bool,
33}
34
35impl EnforceSecret for ElasticSearchSink {
36 fn enforce_secret<'a>(
37 prop_iter: impl Iterator<Item = &'a str>,
38 ) -> crate::error::ConnectorResult<()> {
39 for prop in prop_iter {
40 ElasticSearchOpenSearchConfig::enforce_one(prop)?;
41 }
42 Ok(())
43 }
44}
45
46#[async_trait]
47impl TryFrom<SinkParam> for ElasticSearchSink {
48 type Error = SinkError;
49
50 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
51 let schema = param.schema();
52 let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
53 Ok(Self {
54 config,
55 schema,
56 pk_indices: param.downstream_pk,
57 is_append_only: param.sink_type.is_append_only(),
58 })
59 }
60}
61
62impl Sink for ElasticSearchSink {
63 type Coordinator = DummySinkCommitCoordinator;
64 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
65
66 const SINK_NAME: &'static str = ES_SINK;
67
68 async fn validate(&self) -> Result<()> {
69 self.config.validate_config(&self.schema)?;
70 let client = self.config.build_client(Self::SINK_NAME)?;
71 client.ping().await?;
72 Ok(())
73 }
74
75 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
76 Ok(ElasticSearchOpenSearchSinkWriter::new(
77 self.config.clone(),
78 self.schema.clone(),
79 self.pk_indices.clone(),
80 Self::SINK_NAME,
81 self.is_append_only,
82 )?
83 .into_log_sinker(self.config.concurrent_requests))
84 }
85}