risingwave_connector/sink/elasticsearch_opensearch/
opensearch.rs1use anyhow::anyhow;
16use risingwave_common::catalog::Schema;
17use risingwave_common::session_config::sink_decouple::SinkDecouple;
18use tonic::async_trait;
19
20use super::super::writer::{AsyncTruncateLogSinkerOf, AsyncTruncateSinkWriterExt};
21use super::super::{DummySinkCommitCoordinator, Sink, SinkError, SinkParam, SinkWriterParam};
22use super::elasticsearch_opensearch_client::ElasticSearchOpenSearchSinkWriter;
23use super::elasticsearch_opensearch_config::ElasticSearchOpenSearchConfig;
24use crate::enforce_secret::EnforceSecret;
25use crate::sink::Result;
26
27pub const OPENSEARCH_SINK: &str = "opensearch";
28
29#[derive(Debug)]
30pub struct OpenSearchSink {
31 config: ElasticSearchOpenSearchConfig,
32 schema: Schema,
33 pk_indices: Vec<usize>,
34 is_append_only: bool,
35}
36
37impl EnforceSecret for OpenSearchSink {
38 fn enforce_secret<'a>(
39 prop_iter: impl Iterator<Item = &'a str>,
40 ) -> crate::error::ConnectorResult<()> {
41 for prop in prop_iter {
42 ElasticSearchOpenSearchConfig::enforce_one(prop)?;
43 }
44 Ok(())
45 }
46}
47#[async_trait]
48impl TryFrom<SinkParam> for OpenSearchSink {
49 type Error = SinkError;
50
51 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
52 let schema = param.schema();
53 let config = ElasticSearchOpenSearchConfig::from_btreemap(param.properties)?;
54 Ok(Self {
55 config,
56 schema,
57 pk_indices: param.downstream_pk,
58 is_append_only: param.sink_type.is_append_only(),
59 })
60 }
61}
62
63impl Sink for OpenSearchSink {
64 type Coordinator = DummySinkCommitCoordinator;
65 type LogSinker = AsyncTruncateLogSinkerOf<ElasticSearchOpenSearchSinkWriter>;
66
67 const SINK_NAME: &'static str = OPENSEARCH_SINK;
68
69 async fn validate(&self) -> Result<()> {
70 risingwave_common::license::Feature::OpenSearchSink
71 .check_available()
72 .map_err(|e| anyhow::anyhow!(e))?;
73 self.config.validate_config(&self.schema)?;
74 let client = self.config.build_client(Self::SINK_NAME)?;
75 client.ping().await?;
76 Ok(())
77 }
78
79 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
80 Ok(ElasticSearchOpenSearchSinkWriter::new(
81 self.config.clone(),
82 self.schema.clone(),
83 self.pk_indices.clone(),
84 Self::SINK_NAME,
85 self.is_append_only,
86 )?
87 .into_log_sinker(self.config.concurrent_requests))
88 }
89
90 fn set_default_commit_checkpoint_interval(
91 desc: &mut crate::sink::catalog::desc::SinkDesc,
92 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
93 ) -> Result<()> {
94 if crate::sink::is_sink_support_commit_checkpoint_interval(Self::SINK_NAME) {
95 match desc
96 .properties
97 .get(crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL)
98 {
99 Some(commit_checkpoint_interval) => {
100 let commit_checkpoint_interval = commit_checkpoint_interval
101 .parse::<u64>()
102 .map_err(|e| SinkError::Config(anyhow!(e)))?;
103 if std::matches!(user_specified, SinkDecouple::Disable)
104 && commit_checkpoint_interval > 1
105 {
106 return Err(SinkError::Config(anyhow!(
107 "config conflict: `commit_checkpoint_interval` larger than 1 means that sink decouple must be enabled, but session config sink_decouple is disabled"
108 )));
109 }
110 }
111 None => match user_specified {
112 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
113 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => {
114 desc.properties.insert(
115 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
116 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITH_SINK_DECOUPLE.to_string(),
117 );
118 }
119 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => {
120 desc.properties.insert(
121 crate::sink::decouple_checkpoint_log_sink::COMMIT_CHECKPOINT_INTERVAL.to_owned(),
122 crate::sink::decouple_checkpoint_log_sink::DEFAULT_COMMIT_CHECKPOINT_INTERVAL_WITHOUT_SINK_DECOUPLE.to_string(),
123 );
124 }
125 },
126 }
127 }
128 Ok(())
129 }
130
131 fn is_sink_decouple(
132 user_specified: &risingwave_common::session_config::sink_decouple::SinkDecouple,
133 ) -> Result<bool> {
134 match user_specified {
135 risingwave_common::session_config::sink_decouple::SinkDecouple::Default
136 | risingwave_common::session_config::sink_decouple::SinkDecouple::Enable => Ok(true),
137 risingwave_common::session_config::sink_decouple::SinkDecouple::Disable => Ok(false),
138 }
139 }
140}