risingwave_connector/sink/elasticsearch_opensearch/
elasticsearch_converter.rs1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use risingwave_common::array::{
19 ArrayBuilder, ArrayImpl, JsonbArrayBuilder, StreamChunk, Utf8ArrayBuilder,
20};
21use risingwave_common::catalog::Schema;
22use risingwave_common::types::{JsonbVal, Scalar};
23use serde_json::Value;
24
25use super::elasticsearch_opensearch_config::{
26 ES_OPTION_DELIMITER, ES_OPTION_INDEX, ES_OPTION_INDEX_COLUMN, ES_OPTION_ROUTING_COLUMN,
27};
28use super::elasticsearch_opensearch_formatter::{BuildBulkPara, ElasticSearchOpenSearchFormatter};
29use crate::sink::Result;
30
31#[expect(clippy::large_enum_variant)]
32pub enum StreamChunkConverter {
33 Es(EsStreamChunkConverter),
34 Other,
35}
36impl StreamChunkConverter {
37 pub fn new(
38 sink_name: &str,
39 schema: Schema,
40 pk_indices: &Vec<usize>,
41 properties: &BTreeMap<String, String>,
42 is_append_only: bool,
43 ) -> Result<Self> {
44 if is_remote_es_sink(sink_name) {
45 let index_column = properties
46 .get(ES_OPTION_INDEX_COLUMN)
47 .cloned()
48 .map(|n| {
49 schema
50 .fields()
51 .iter()
52 .position(|s| s.name == n)
53 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_INDEX_COLUMN))
54 })
55 .transpose()?;
56 let index = properties.get(ES_OPTION_INDEX).cloned();
57 let routing_column = properties
58 .get(ES_OPTION_ROUTING_COLUMN)
59 .cloned()
60 .map(|n| {
61 schema
62 .fields()
63 .iter()
64 .position(|s| s.name == n)
65 .ok_or_else(|| anyhow!("Cannot find {}", ES_OPTION_ROUTING_COLUMN))
66 })
67 .transpose()?;
68 Ok(StreamChunkConverter::Es(EsStreamChunkConverter::new(
69 schema,
70 pk_indices.clone(),
71 properties.get(ES_OPTION_DELIMITER).cloned(),
72 index_column,
73 index,
74 routing_column,
75 is_append_only,
76 )?))
77 } else {
78 Ok(StreamChunkConverter::Other)
79 }
80 }
81
82 pub fn convert_chunk(&self, chunk: StreamChunk) -> Result<StreamChunk> {
83 match self {
84 StreamChunkConverter::Es(es) => es.convert_chunk(chunk, es.is_append_only),
85 StreamChunkConverter::Other => Ok(chunk),
86 }
87 }
88}
89pub struct EsStreamChunkConverter {
90 formatter: ElasticSearchOpenSearchFormatter,
91 is_append_only: bool,
92}
93impl EsStreamChunkConverter {
94 pub fn new(
95 schema: Schema,
96 pk_indices: Vec<usize>,
97 delimiter: Option<String>,
98 index_column: Option<usize>,
99 index: Option<String>,
100 routing_column: Option<usize>,
101 is_append_only: bool,
102 ) -> Result<Self> {
103 let formatter = ElasticSearchOpenSearchFormatter::new(
104 pk_indices,
105 &schema,
106 delimiter,
107 index_column,
108 index,
109 routing_column,
110 )?;
111 Ok(Self {
112 formatter,
113 is_append_only,
114 })
115 }
116
117 fn convert_chunk(&self, chunk: StreamChunk, is_append_only: bool) -> Result<StreamChunk> {
118 let mut ops = Vec::with_capacity(chunk.capacity());
119 let mut id_string_builder =
120 <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
121 let mut json_builder =
122 <JsonbArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
123 let mut index_builder =
124 <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
125 let mut routing_builder =
126 <Utf8ArrayBuilder as risingwave_common::array::ArrayBuilder>::new(chunk.capacity());
127 for build_bulk_para in self.formatter.convert_chunk(chunk, is_append_only)? {
128 let BuildBulkPara {
129 key,
130 value,
131 index,
132 routing_column,
133 ..
134 } = build_bulk_para;
135
136 id_string_builder.append(Some(&key));
137 index_builder.append(Some(&index));
138 routing_builder.append(routing_column.as_deref());
139 if value.is_some() {
140 ops.push(risingwave_common::array::Op::Insert);
141 } else {
142 ops.push(risingwave_common::array::Op::Delete);
143 }
144 let value = value.map(|json| JsonbVal::from(Value::Object(json)));
145 json_builder.append(value.as_ref().map(|json| json.as_scalar_ref()));
146 }
147 let json_array = risingwave_common::array::ArrayBuilder::finish(json_builder);
148 let id_string_array = risingwave_common::array::ArrayBuilder::finish(id_string_builder);
149 let index_string_array = risingwave_common::array::ArrayBuilder::finish(index_builder);
150 let routing_string_array = risingwave_common::array::ArrayBuilder::finish(routing_builder);
151 Ok(StreamChunk::new(
152 ops,
153 vec![
154 std::sync::Arc::new(ArrayImpl::Utf8(index_string_array)),
155 std::sync::Arc::new(ArrayImpl::Utf8(id_string_array)),
156 std::sync::Arc::new(ArrayImpl::Jsonb(json_array)),
157 std::sync::Arc::new(ArrayImpl::Utf8(routing_string_array)),
158 ],
159 ))
160 }
161}
162
163pub fn is_remote_es_sink(_sink_name: &str) -> bool {
164 false
166}