1use core::pin::Pin;
16use core::time::Duration;
17use std::collections::{BTreeMap, HashMap, VecDeque};
18
19use anyhow::{Context, anyhow};
20use futures::future::pending;
21use futures::prelude::Future;
22use futures::{Stream, StreamExt};
23use futures_async_stream::try_stream;
24use gcp_bigquery_client::Client;
25use gcp_bigquery_client::error::BQError;
26use gcp_bigquery_client::model::query_request::QueryRequest;
27use gcp_bigquery_client::model::table::Table;
28use gcp_bigquery_client::model::table_field_schema::TableFieldSchema;
29use gcp_bigquery_client::model::table_schema::TableSchema;
30use google_cloud_bigquery::grpc::apiv1::conn_pool::{DOMAIN, WriteConnectionManager};
31use google_cloud_gax::conn::{ConnectionOptions, Environment};
32use google_cloud_gax::grpc::Request;
33use google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_request::{
34 MissingValueInterpretation, ProtoData, Rows as AppendRowsRequestRows,
35};
36use google_cloud_googleapis::cloud::bigquery::storage::v1::{
37 AppendRowsRequest, AppendRowsResponse, ProtoRows, ProtoSchema,
38};
39use google_cloud_pubsub::client::google_cloud_auth;
40use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
41use prost_reflect::{FieldDescriptor, MessageDescriptor};
42use prost_types::{
43 DescriptorProto, FieldDescriptorProto, FileDescriptorProto, FileDescriptorSet,
44 field_descriptor_proto,
45};
46use risingwave_common::array::{Op, StreamChunk};
47use risingwave_common::catalog::{Field, Schema};
48use risingwave_common::types::DataType;
49use serde_derive::Deserialize;
50use serde_with::{DisplayFromStr, serde_as};
51use simd_json::prelude::ArrayTrait;
52use tokio::sync::mpsc;
53use tonic::{Response, Status, async_trait};
54use url::Url;
55use uuid::Uuid;
56use with_options::WithOptions;
57use yup_oauth2::ServiceAccountKey;
58
59use super::encoder::{ProtoEncoder, ProtoHeader, RowEncoder, SerTo};
60use super::log_store::{LogStoreReadItem, TruncateOffset};
61use super::{
62 LogSinker, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError, SinkLogReader,
63};
64use crate::aws_utils::load_file_descriptor_from_s3;
65use crate::connector_common::AwsAuthProps;
66use crate::sink::{DummySinkCommitCoordinator, Result, Sink, SinkParam, SinkWriterParam};
67
68pub const BIGQUERY_SINK: &str = "bigquery";
69pub const CHANGE_TYPE: &str = "_CHANGE_TYPE";
70const DEFAULT_GRPC_CHANNEL_NUMS: usize = 4;
71const CONNECT_TIMEOUT: Option<Duration> = Some(Duration::from_secs(30));
72const CONNECTION_TIMEOUT: Option<Duration> = None;
73const BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;
74const MAX_ROW_SIZE: usize = 8 * 1024 * 1024;
76
77#[serde_as]
78#[derive(Deserialize, Debug, Clone, WithOptions)]
79pub struct BigQueryCommon {
80 #[serde(rename = "bigquery.local.path")]
81 pub local_path: Option<String>,
82 #[serde(rename = "bigquery.s3.path")]
83 pub s3_path: Option<String>,
84 #[serde(rename = "bigquery.project")]
85 pub project: String,
86 #[serde(rename = "bigquery.dataset")]
87 pub dataset: String,
88 #[serde(rename = "bigquery.table")]
89 pub table: String,
90 #[serde(default)] #[serde_as(as = "DisplayFromStr")]
92 pub auto_create: bool,
93 #[serde(rename = "bigquery.credentials")]
94 pub credentials: Option<String>,
95}
96
97struct BigQueryFutureManager {
98 offset_queue: VecDeque<(TruncateOffset, usize)>,
105 resp_stream: Pin<Box<dyn Stream<Item = Result<()>> + Send>>,
106}
107impl BigQueryFutureManager {
108 pub fn new(
109 max_future_num: usize,
110 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
111 ) -> Self {
112 let offset_queue = VecDeque::with_capacity(max_future_num);
113 Self {
114 offset_queue,
115 resp_stream: Box::pin(resp_stream),
116 }
117 }
118
119 pub fn add_offset(&mut self, offset: TruncateOffset, resp_num: usize) {
120 self.offset_queue.push_back((offset, resp_num));
121 }
122
123 pub async fn next_offset(&mut self) -> Result<TruncateOffset> {
124 if let Some((_offset, remaining_resp_num)) = self.offset_queue.front_mut() {
125 if *remaining_resp_num == 0 {
126 return Ok(self.offset_queue.pop_front().unwrap().0);
127 }
128 while *remaining_resp_num > 0 {
129 self.resp_stream
130 .next()
131 .await
132 .ok_or_else(|| SinkError::BigQuery(anyhow::anyhow!("end of stream")))??;
133 *remaining_resp_num -= 1;
134 }
135 Ok(self.offset_queue.pop_front().unwrap().0)
136 } else {
137 pending().await
138 }
139 }
140}
141pub struct BigQueryLogSinker {
142 writer: BigQuerySinkWriter,
143 bigquery_future_manager: BigQueryFutureManager,
144 future_num: usize,
145}
146impl BigQueryLogSinker {
147 pub fn new(
148 writer: BigQuerySinkWriter,
149 resp_stream: impl Stream<Item = Result<()>> + Send + 'static,
150 future_num: usize,
151 ) -> Self {
152 Self {
153 writer,
154 bigquery_future_manager: BigQueryFutureManager::new(future_num, resp_stream),
155 future_num,
156 }
157 }
158}
159
160#[async_trait]
161impl LogSinker for BigQueryLogSinker {
162 async fn consume_log_and_sink(mut self, mut log_reader: impl SinkLogReader) -> Result<!> {
163 log_reader.start_from(None).await?;
164 loop {
165 tokio::select!(
166 offset = self.bigquery_future_manager.next_offset() => {
167 log_reader.truncate(offset?)?;
168 }
169 item_result = log_reader.next_item(), if self.bigquery_future_manager.offset_queue.len() <= self.future_num => {
170 let (epoch, item) = item_result?;
171 match item {
172 LogStoreReadItem::StreamChunk { chunk_id, chunk } => {
173 let resp_num = self.writer.write_chunk(chunk)?;
174 self.bigquery_future_manager
175 .add_offset(TruncateOffset::Chunk { epoch, chunk_id },resp_num);
176 }
177 LogStoreReadItem::Barrier { .. } => {
178 self.bigquery_future_manager
179 .add_offset(TruncateOffset::Barrier { epoch },0);
180 }
181 }
182 }
183 )
184 }
185 }
186}
187
188impl BigQueryCommon {
189 async fn build_client(&self, aws_auth_props: &AwsAuthProps) -> Result<Client> {
190 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
191
192 let service_account = serde_json::from_str::<ServiceAccountKey>(&auth_json)
193 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
194 let client: Client = Client::from_service_account_key(service_account, false)
195 .await
196 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
197 Ok(client)
198 }
199
200 async fn build_writer_client(
201 &self,
202 aws_auth_props: &AwsAuthProps,
203 ) -> Result<(StorageWriterClient, impl Stream<Item = Result<()>> + use<>)> {
204 let auth_json = self.get_auth_json_from_path(aws_auth_props).await?;
205
206 let credentials_file = CredentialsFile::new_from_str(&auth_json)
207 .await
208 .map_err(|e| SinkError::BigQuery(e.into()))?;
209 StorageWriterClient::new(credentials_file).await
210 }
211
212 async fn get_auth_json_from_path(&self, aws_auth_props: &AwsAuthProps) -> Result<String> {
213 if let Some(credentials) = &self.credentials {
214 Ok(credentials.clone())
215 } else if let Some(local_path) = &self.local_path {
216 std::fs::read_to_string(local_path)
217 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))
218 } else if let Some(s3_path) = &self.s3_path {
219 let url =
220 Url::parse(s3_path).map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
221 let auth_vec = load_file_descriptor_from_s3(&url, aws_auth_props)
222 .await
223 .map_err(|err| SinkError::BigQuery(anyhow::anyhow!(err)))?;
224 Ok(String::from_utf8(auth_vec).map_err(|e| SinkError::BigQuery(e.into()))?)
225 } else {
226 Err(SinkError::BigQuery(anyhow::anyhow!(
227 "`bigquery.local.path` and `bigquery.s3.path` set at least one, configure as needed."
228 )))
229 }
230 }
231}
232
233#[serde_as]
234#[derive(Clone, Debug, Deserialize, WithOptions)]
235pub struct BigQueryConfig {
236 #[serde(flatten)]
237 pub common: BigQueryCommon,
238 #[serde(flatten)]
239 pub aws_auth_props: AwsAuthProps,
240 pub r#type: String, }
242impl BigQueryConfig {
243 pub fn from_btreemap(properties: BTreeMap<String, String>) -> Result<Self> {
244 let config =
245 serde_json::from_value::<BigQueryConfig>(serde_json::to_value(properties).unwrap())
246 .map_err(|e| SinkError::Config(anyhow!(e)))?;
247 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
248 return Err(SinkError::Config(anyhow!(
249 "`{}` must be {}, or {}",
250 SINK_TYPE_OPTION,
251 SINK_TYPE_APPEND_ONLY,
252 SINK_TYPE_UPSERT
253 )));
254 }
255 Ok(config)
256 }
257}
258
259#[derive(Debug)]
260pub struct BigQuerySink {
261 pub config: BigQueryConfig,
262 schema: Schema,
263 pk_indices: Vec<usize>,
264 is_append_only: bool,
265}
266
267impl BigQuerySink {
268 pub fn new(
269 config: BigQueryConfig,
270 schema: Schema,
271 pk_indices: Vec<usize>,
272 is_append_only: bool,
273 ) -> Result<Self> {
274 Ok(Self {
275 config,
276 schema,
277 pk_indices,
278 is_append_only,
279 })
280 }
281}
282
283impl BigQuerySink {
284 fn check_column_name_and_type(
285 &self,
286 big_query_columns_desc: HashMap<String, String>,
287 ) -> Result<()> {
288 let rw_fields_name = self.schema.fields();
289 if big_query_columns_desc.is_empty() {
290 return Err(SinkError::BigQuery(anyhow::anyhow!(
291 "Cannot find table in bigquery"
292 )));
293 }
294 if rw_fields_name.len().ne(&big_query_columns_desc.len()) {
295 return Err(SinkError::BigQuery(anyhow::anyhow!(
296 "The length of the RisingWave column {} must be equal to the length of the bigquery column {}",
297 rw_fields_name.len(),
298 big_query_columns_desc.len()
299 )));
300 }
301
302 for i in rw_fields_name {
303 let value = big_query_columns_desc.get(&i.name).ok_or_else(|| {
304 SinkError::BigQuery(anyhow::anyhow!(
305 "Column `{:?}` on RisingWave side is not found on BigQuery side.",
306 i.name
307 ))
308 })?;
309 let data_type_string = Self::get_string_and_check_support_from_datatype(&i.data_type)?;
310 if data_type_string.ne(value) {
311 return Err(SinkError::BigQuery(anyhow::anyhow!(
312 "Data type mismatch for column `{:?}`. BigQuery side: `{:?}`, RisingWave side: `{:?}`. ",
313 i.name,
314 value,
315 data_type_string
316 )));
317 };
318 }
319 Ok(())
320 }
321
322 fn get_string_and_check_support_from_datatype(rw_data_type: &DataType) -> Result<String> {
323 match rw_data_type {
324 DataType::Boolean => Ok("BOOL".to_owned()),
325 DataType::Int16 => Ok("INT64".to_owned()),
326 DataType::Int32 => Ok("INT64".to_owned()),
327 DataType::Int64 => Ok("INT64".to_owned()),
328 DataType::Float32 => Err(SinkError::BigQuery(anyhow::anyhow!(
329 "Bigquery cannot support real"
330 ))),
331 DataType::Float64 => Ok("FLOAT64".to_owned()),
332 DataType::Decimal => Ok("NUMERIC".to_owned()),
333 DataType::Date => Ok("DATE".to_owned()),
334 DataType::Varchar => Ok("STRING".to_owned()),
335 DataType::Time => Ok("TIME".to_owned()),
336 DataType::Timestamp => Ok("DATETIME".to_owned()),
337 DataType::Timestamptz => Ok("TIMESTAMP".to_owned()),
338 DataType::Interval => Ok("INTERVAL".to_owned()),
339 DataType::Struct(structs) => {
340 let mut elements_vec = vec![];
341 for (name, datatype) in structs.iter() {
342 let element_string =
343 Self::get_string_and_check_support_from_datatype(datatype)?;
344 elements_vec.push(format!("{} {}", name, element_string));
345 }
346 Ok(format!("STRUCT<{}>", elements_vec.join(", ")))
347 }
348 DataType::List(l) => {
349 let element_string = Self::get_string_and_check_support_from_datatype(l.as_ref())?;
350 Ok(format!("ARRAY<{}>", element_string))
351 }
352 DataType::Bytea => Ok("BYTES".to_owned()),
353 DataType::Jsonb => Ok("JSON".to_owned()),
354 DataType::Serial => Ok("INT64".to_owned()),
355 DataType::Int256 => Err(SinkError::BigQuery(anyhow::anyhow!(
356 "Bigquery cannot support Int256"
357 ))),
358 DataType::Map(_) => Err(SinkError::BigQuery(anyhow::anyhow!(
359 "Bigquery cannot support Map"
360 ))),
361 }
362 }
363
364 fn map_field(rw_field: &Field) -> Result<TableFieldSchema> {
365 let tfs = match &rw_field.data_type {
366 DataType::Boolean => TableFieldSchema::bool(&rw_field.name),
367 DataType::Int16 | DataType::Int32 | DataType::Int64 | DataType::Serial => {
368 TableFieldSchema::integer(&rw_field.name)
369 }
370 DataType::Float32 => {
371 return Err(SinkError::BigQuery(anyhow::anyhow!(
372 "Bigquery cannot support real"
373 )));
374 }
375 DataType::Float64 => TableFieldSchema::float(&rw_field.name),
376 DataType::Decimal => TableFieldSchema::numeric(&rw_field.name),
377 DataType::Date => TableFieldSchema::date(&rw_field.name),
378 DataType::Varchar => TableFieldSchema::string(&rw_field.name),
379 DataType::Time => TableFieldSchema::time(&rw_field.name),
380 DataType::Timestamp => TableFieldSchema::date_time(&rw_field.name),
381 DataType::Timestamptz => TableFieldSchema::timestamp(&rw_field.name),
382 DataType::Interval => {
383 return Err(SinkError::BigQuery(anyhow::anyhow!(
384 "Bigquery cannot support Interval"
385 )));
386 }
387 DataType::Struct(st) => {
388 let mut sub_fields = Vec::with_capacity(st.len());
389 for (name, dt) in st.iter() {
390 let rw_field = Field::with_name(dt.clone(), name);
391 let field = Self::map_field(&rw_field)?;
392 sub_fields.push(field);
393 }
394 TableFieldSchema::record(&rw_field.name, sub_fields)
395 }
396 DataType::List(dt) => {
397 let inner_field = Self::map_field(&Field::with_name(*dt.clone(), &rw_field.name))?;
398 TableFieldSchema {
399 mode: Some("REPEATED".to_owned()),
400 ..inner_field
401 }
402 }
403
404 DataType::Bytea => TableFieldSchema::bytes(&rw_field.name),
405 DataType::Jsonb => TableFieldSchema::json(&rw_field.name),
406 DataType::Int256 => {
407 return Err(SinkError::BigQuery(anyhow::anyhow!(
408 "Bigquery cannot support Int256"
409 )));
410 }
411 DataType::Map(_) => {
412 return Err(SinkError::BigQuery(anyhow::anyhow!(
413 "Bigquery cannot support Map"
414 )));
415 }
416 };
417 Ok(tfs)
418 }
419
420 async fn create_table(
421 &self,
422 client: &Client,
423 project_id: &str,
424 dataset_id: &str,
425 table_id: &str,
426 fields: &Vec<Field>,
427 ) -> Result<Table> {
428 let dataset = client
429 .dataset()
430 .get(project_id, dataset_id)
431 .await
432 .map_err(|e| SinkError::BigQuery(e.into()))?;
433 let fields: Vec<_> = fields.iter().map(Self::map_field).collect::<Result<_>>()?;
434 let table = Table::from_dataset(&dataset, table_id, TableSchema::new(fields));
435
436 client
437 .table()
438 .create(table)
439 .await
440 .map_err(|e| SinkError::BigQuery(e.into()))
441 }
442}
443
444impl Sink for BigQuerySink {
445 type Coordinator = DummySinkCommitCoordinator;
446 type LogSinker = BigQueryLogSinker;
447
448 const SINK_NAME: &'static str = BIGQUERY_SINK;
449
450 async fn new_log_sinker(&self, _writer_param: SinkWriterParam) -> Result<Self::LogSinker> {
451 let (writer, resp_stream) = BigQuerySinkWriter::new(
452 self.config.clone(),
453 self.schema.clone(),
454 self.pk_indices.clone(),
455 self.is_append_only,
456 )
457 .await?;
458 Ok(BigQueryLogSinker::new(
459 writer,
460 resp_stream,
461 BIGQUERY_SEND_FUTURE_BUFFER_MAX_SIZE,
462 ))
463 }
464
465 async fn validate(&self) -> Result<()> {
466 risingwave_common::license::Feature::BigQuerySink
467 .check_available()
468 .map_err(|e| anyhow::anyhow!(e))?;
469 if !self.is_append_only && self.pk_indices.is_empty() {
470 return Err(SinkError::Config(anyhow!(
471 "Primary key not defined for upsert bigquery sink (please define in `primary_key` field)"
472 )));
473 }
474 let client = self
475 .config
476 .common
477 .build_client(&self.config.aws_auth_props)
478 .await?;
479 let BigQueryCommon {
480 project: project_id,
481 dataset: dataset_id,
482 table: table_id,
483 ..
484 } = &self.config.common;
485
486 if self.config.common.auto_create {
487 match client
488 .table()
489 .get(project_id, dataset_id, table_id, None)
490 .await
491 {
492 Err(BQError::RequestError(_)) => {
493 return self
495 .create_table(
496 &client,
497 project_id,
498 dataset_id,
499 table_id,
500 &self.schema.fields,
501 )
502 .await
503 .map(|_| ());
504 }
505 Err(e) => return Err(SinkError::BigQuery(e.into())),
506 _ => {}
507 }
508 }
509
510 let mut rs = client
511 .job()
512 .query(
513 &self.config.common.project,
514 QueryRequest::new(format!(
515 "SELECT column_name, data_type FROM `{}.{}.INFORMATION_SCHEMA.COLUMNS` WHERE table_name = '{}'",
516 project_id, dataset_id, table_id,
517 )),
518 ).await.map_err(|e| SinkError::BigQuery(e.into()))?;
519
520 let mut big_query_schema = HashMap::default();
521 while rs.next_row() {
522 big_query_schema.insert(
523 rs.get_string_by_name("column_name")
524 .map_err(|e| SinkError::BigQuery(e.into()))?
525 .ok_or_else(|| {
526 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
527 })?,
528 rs.get_string_by_name("data_type")
529 .map_err(|e| SinkError::BigQuery(e.into()))?
530 .ok_or_else(|| {
531 SinkError::BigQuery(anyhow::anyhow!("Cannot find column_name"))
532 })?,
533 );
534 }
535
536 self.check_column_name_and_type(big_query_schema)?;
537 Ok(())
538 }
539}
540
541pub struct BigQuerySinkWriter {
542 pub config: BigQueryConfig,
543 #[expect(dead_code)]
544 schema: Schema,
545 #[expect(dead_code)]
546 pk_indices: Vec<usize>,
547 client: StorageWriterClient,
548 is_append_only: bool,
549 row_encoder: ProtoEncoder,
550 writer_pb_schema: ProtoSchema,
551 #[expect(dead_code)]
552 message_descriptor: MessageDescriptor,
553 write_stream: String,
554 proto_field: Option<FieldDescriptor>,
555}
556
557impl TryFrom<SinkParam> for BigQuerySink {
558 type Error = SinkError;
559
560 fn try_from(param: SinkParam) -> std::result::Result<Self, Self::Error> {
561 let schema = param.schema();
562 let config = BigQueryConfig::from_btreemap(param.properties)?;
563 BigQuerySink::new(
564 config,
565 schema,
566 param.downstream_pk,
567 param.sink_type.is_append_only(),
568 )
569 }
570}
571
572impl BigQuerySinkWriter {
573 pub async fn new(
574 config: BigQueryConfig,
575 schema: Schema,
576 pk_indices: Vec<usize>,
577 is_append_only: bool,
578 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
579 let (client, resp_stream) = config
580 .common
581 .build_writer_client(&config.aws_auth_props)
582 .await?;
583 let mut descriptor_proto = build_protobuf_schema(
584 schema
585 .fields()
586 .iter()
587 .map(|f| (f.name.as_str(), &f.data_type)),
588 config.common.table.clone(),
589 )?;
590
591 if !is_append_only {
592 let field = FieldDescriptorProto {
593 name: Some(CHANGE_TYPE.to_owned()),
594 number: Some((schema.len() + 1) as i32),
595 r#type: Some(field_descriptor_proto::Type::String.into()),
596 ..Default::default()
597 };
598 descriptor_proto.field.push(field);
599 }
600
601 let descriptor_pool = build_protobuf_descriptor_pool(&descriptor_proto)?;
602 let message_descriptor = descriptor_pool
603 .get_message_by_name(&config.common.table)
604 .ok_or_else(|| {
605 SinkError::BigQuery(anyhow::anyhow!(
606 "Can't find message proto {}",
607 &config.common.table
608 ))
609 })?;
610 let proto_field = if !is_append_only {
611 let proto_field = message_descriptor
612 .get_field_by_name(CHANGE_TYPE)
613 .ok_or_else(|| {
614 SinkError::BigQuery(anyhow::anyhow!("Can't find {}", CHANGE_TYPE))
615 })?;
616 Some(proto_field)
617 } else {
618 None
619 };
620 let row_encoder = ProtoEncoder::new(
621 schema.clone(),
622 None,
623 message_descriptor.clone(),
624 ProtoHeader::None,
625 )?;
626 Ok((
627 Self {
628 write_stream: format!(
629 "projects/{}/datasets/{}/tables/{}/streams/_default",
630 config.common.project, config.common.dataset, config.common.table
631 ),
632 config,
633 schema,
634 pk_indices,
635 client,
636 is_append_only,
637 row_encoder,
638 message_descriptor,
639 proto_field,
640 writer_pb_schema: ProtoSchema {
641 proto_descriptor: Some(descriptor_proto),
642 },
643 },
644 resp_stream,
645 ))
646 }
647
648 fn append_only(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
649 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
650 for (op, row) in chunk.rows() {
651 if op != Op::Insert {
652 continue;
653 }
654 serialized_rows.push(self.row_encoder.encode(row)?.ser_to()?)
655 }
656 Ok(serialized_rows)
657 }
658
659 fn upsert(&mut self, chunk: StreamChunk) -> Result<Vec<Vec<u8>>> {
660 let mut serialized_rows: Vec<Vec<u8>> = Vec::with_capacity(chunk.capacity());
661 for (op, row) in chunk.rows() {
662 if op == Op::UpdateDelete {
663 continue;
664 }
665 let mut pb_row = self.row_encoder.encode(row)?;
666 match op {
667 Op::Insert => pb_row
668 .message
669 .try_set_field(
670 self.proto_field.as_ref().unwrap(),
671 prost_reflect::Value::String("UPSERT".to_owned()),
672 )
673 .map_err(|e| SinkError::BigQuery(e.into()))?,
674 Op::Delete => pb_row
675 .message
676 .try_set_field(
677 self.proto_field.as_ref().unwrap(),
678 prost_reflect::Value::String("DELETE".to_owned()),
679 )
680 .map_err(|e| SinkError::BigQuery(e.into()))?,
681 Op::UpdateDelete => continue,
682 Op::UpdateInsert => pb_row
683 .message
684 .try_set_field(
685 self.proto_field.as_ref().unwrap(),
686 prost_reflect::Value::String("UPSERT".to_owned()),
687 )
688 .map_err(|e| SinkError::BigQuery(e.into()))?,
689 };
690
691 serialized_rows.push(pb_row.ser_to()?)
692 }
693 Ok(serialized_rows)
694 }
695
696 fn write_chunk(&mut self, chunk: StreamChunk) -> Result<usize> {
697 let serialized_rows = if self.is_append_only {
698 self.append_only(chunk)?
699 } else {
700 self.upsert(chunk)?
701 };
702 if serialized_rows.is_empty() {
703 return Ok(0);
704 }
705 let mut result = Vec::new();
706 let mut result_inner = Vec::new();
707 let mut size_count = 0;
708 for i in serialized_rows {
709 size_count += i.len();
710 if size_count > MAX_ROW_SIZE {
711 result.push(result_inner);
712 result_inner = Vec::new();
713 size_count = i.len();
714 }
715 result_inner.push(i);
716 }
717 if !result_inner.is_empty() {
718 result.push(result_inner);
719 }
720 let len = result.len();
721 for serialized_rows in result {
722 let rows = AppendRowsRequestRows::ProtoRows(ProtoData {
723 writer_schema: Some(self.writer_pb_schema.clone()),
724 rows: Some(ProtoRows { serialized_rows }),
725 });
726 self.client.append_rows(rows, self.write_stream.clone())?;
727 }
728 Ok(len)
729 }
730}
731
732#[try_stream(ok = (), error = SinkError)]
733pub async fn resp_to_stream(
734 resp_stream: impl Future<
735 Output = std::result::Result<
736 Response<google_cloud_gax::grpc::Streaming<AppendRowsResponse>>,
737 Status,
738 >,
739 >
740 + 'static
741 + Send,
742) {
743 let mut resp_stream = resp_stream
744 .await
745 .map_err(|e| SinkError::BigQuery(e.into()))?
746 .into_inner();
747 loop {
748 match resp_stream
749 .message()
750 .await
751 .map_err(|e| SinkError::BigQuery(e.into()))?
752 {
753 Some(append_rows_response) => {
754 if !append_rows_response.row_errors.is_empty() {
755 return Err(SinkError::BigQuery(anyhow::anyhow!(
756 "bigquery insert error {:?}",
757 append_rows_response.row_errors
758 )));
759 }
760 if let Some(google_cloud_googleapis::cloud::bigquery::storage::v1::append_rows_response::Response::Error(status)) = append_rows_response.response{
761 return Err(SinkError::BigQuery(anyhow::anyhow!(
762 "bigquery insert error {:?}",
763 status
764 )));
765 }
766 yield ();
767 }
768 None => {
769 return Err(SinkError::BigQuery(anyhow::anyhow!(
770 "bigquery insert error: end of resp stream",
771 )));
772 }
773 }
774 }
775}
776
777struct StorageWriterClient {
778 #[expect(dead_code)]
779 environment: Environment,
780 request_sender: mpsc::UnboundedSender<AppendRowsRequest>,
781}
782impl StorageWriterClient {
783 pub async fn new(
784 credentials: CredentialsFile,
785 ) -> Result<(Self, impl Stream<Item = Result<()>>)> {
786 let ts_grpc = google_cloud_auth::token::DefaultTokenSourceProvider::new_with_credentials(
787 Self::bigquery_grpc_auth_config(),
788 Box::new(credentials),
789 )
790 .await
791 .map_err(|e| SinkError::BigQuery(e.into()))?;
792 let conn_options = ConnectionOptions {
793 connect_timeout: CONNECT_TIMEOUT,
794 timeout: CONNECTION_TIMEOUT,
795 };
796 let environment = Environment::GoogleCloud(Box::new(ts_grpc));
797 let conn = WriteConnectionManager::new(
798 DEFAULT_GRPC_CHANNEL_NUMS,
799 &environment,
800 DOMAIN,
801 &conn_options,
802 )
803 .await
804 .map_err(|e| SinkError::BigQuery(e.into()))?;
805 let mut client = conn.conn();
806
807 let (tx, rx) = mpsc::unbounded_channel();
808 let stream = tokio_stream::wrappers::UnboundedReceiverStream::new(rx);
809
810 let resp = async move { client.append_rows(Request::new(stream)).await };
811 let resp_stream = resp_to_stream(resp);
812
813 Ok((
814 StorageWriterClient {
815 environment,
816 request_sender: tx,
817 },
818 resp_stream,
819 ))
820 }
821
822 pub fn append_rows(&mut self, row: AppendRowsRequestRows, write_stream: String) -> Result<()> {
823 let append_req = AppendRowsRequest {
824 write_stream: write_stream.clone(),
825 offset: None,
826 trace_id: Uuid::new_v4().hyphenated().to_string(),
827 missing_value_interpretations: HashMap::default(),
828 rows: Some(row),
829 default_missing_value_interpretation: MissingValueInterpretation::DefaultValue as i32,
830 };
831 self.request_sender
832 .send(append_req)
833 .map_err(|e| SinkError::BigQuery(e.into()))?;
834 Ok(())
835 }
836
837 fn bigquery_grpc_auth_config() -> google_cloud_auth::project::Config<'static> {
838 let mut auth_config = google_cloud_auth::project::Config::default();
839 auth_config =
840 auth_config.with_audience(google_cloud_bigquery::grpc::apiv1::conn_pool::AUDIENCE);
841 auth_config =
842 auth_config.with_scopes(&google_cloud_bigquery::grpc::apiv1::conn_pool::SCOPES);
843 auth_config
844 }
845}
846
847fn build_protobuf_descriptor_pool(desc: &DescriptorProto) -> Result<prost_reflect::DescriptorPool> {
848 let file_descriptor = FileDescriptorProto {
849 message_type: vec![desc.clone()],
850 name: Some("bigquery".to_owned()),
851 ..Default::default()
852 };
853
854 prost_reflect::DescriptorPool::from_file_descriptor_set(FileDescriptorSet {
855 file: vec![file_descriptor],
856 })
857 .context("failed to build descriptor pool")
858 .map_err(SinkError::BigQuery)
859}
860
861fn build_protobuf_schema<'a>(
862 fields: impl Iterator<Item = (&'a str, &'a DataType)>,
863 name: String,
864) -> Result<DescriptorProto> {
865 let mut proto = DescriptorProto {
866 name: Some(name),
867 ..Default::default()
868 };
869 let mut struct_vec = vec![];
870 let field_vec = fields
871 .enumerate()
872 .map(|(index, (name, data_type))| {
873 let (field, des_proto) =
874 build_protobuf_field(data_type, (index + 1) as i32, name.to_owned())?;
875 if let Some(sv) = des_proto {
876 struct_vec.push(sv);
877 }
878 Ok(field)
879 })
880 .collect::<Result<Vec<_>>>()?;
881 proto.field = field_vec;
882 proto.nested_type = struct_vec;
883 Ok(proto)
884}
885
886fn build_protobuf_field(
887 data_type: &DataType,
888 index: i32,
889 name: String,
890) -> Result<(FieldDescriptorProto, Option<DescriptorProto>)> {
891 let mut field = FieldDescriptorProto {
892 name: Some(name.clone()),
893 number: Some(index),
894 ..Default::default()
895 };
896 match data_type {
897 DataType::Boolean => field.r#type = Some(field_descriptor_proto::Type::Bool.into()),
898 DataType::Int32 => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
899 DataType::Int16 | DataType::Int64 => {
900 field.r#type = Some(field_descriptor_proto::Type::Int64.into())
901 }
902 DataType::Float64 => field.r#type = Some(field_descriptor_proto::Type::Double.into()),
903 DataType::Decimal => field.r#type = Some(field_descriptor_proto::Type::String.into()),
904 DataType::Date => field.r#type = Some(field_descriptor_proto::Type::Int32.into()),
905 DataType::Varchar => field.r#type = Some(field_descriptor_proto::Type::String.into()),
906 DataType::Time => field.r#type = Some(field_descriptor_proto::Type::String.into()),
907 DataType::Timestamp => field.r#type = Some(field_descriptor_proto::Type::String.into()),
908 DataType::Timestamptz => field.r#type = Some(field_descriptor_proto::Type::String.into()),
909 DataType::Interval => field.r#type = Some(field_descriptor_proto::Type::String.into()),
910 DataType::Struct(s) => {
911 field.r#type = Some(field_descriptor_proto::Type::Message.into());
912 let name = format!("Struct{}", name);
913 let sub_proto = build_protobuf_schema(s.iter(), name.clone())?;
914 field.type_name = Some(name);
915 return Ok((field, Some(sub_proto)));
916 }
917 DataType::List(l) => {
918 let (mut field, proto) = build_protobuf_field(l.as_ref(), index, name.clone())?;
919 field.label = Some(field_descriptor_proto::Label::Repeated.into());
920 return Ok((field, proto));
921 }
922 DataType::Bytea => field.r#type = Some(field_descriptor_proto::Type::Bytes.into()),
923 DataType::Jsonb => field.r#type = Some(field_descriptor_proto::Type::String.into()),
924 DataType::Serial => field.r#type = Some(field_descriptor_proto::Type::Int64.into()),
925 DataType::Float32 | DataType::Int256 => {
926 return Err(SinkError::BigQuery(anyhow::anyhow!(
927 "Don't support Float32 and Int256"
928 )));
929 }
930 DataType::Map(_) => todo!(),
931 }
932 Ok((field, None))
933}
934
935#[cfg(test)]
936mod test {
937
938 use std::assert_matches::assert_matches;
939
940 use risingwave_common::catalog::{Field, Schema};
941 use risingwave_common::types::{DataType, StructType};
942
943 use crate::sink::big_query::{
944 BigQuerySink, build_protobuf_descriptor_pool, build_protobuf_schema,
945 };
946
947 #[tokio::test]
948 async fn test_type_check() {
949 let big_query_type_string = "ARRAY<STRUCT<v1 ARRAY<INT64>, v2 STRUCT<v1 INT64, v2 INT64>>>";
950 let rw_datatype = DataType::List(Box::new(DataType::Struct(StructType::new(vec![
951 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
952 (
953 "v2".to_owned(),
954 DataType::Struct(StructType::new(vec![
955 ("v1".to_owned(), DataType::Int64),
956 ("v2".to_owned(), DataType::Int64),
957 ])),
958 ),
959 ]))));
960 assert_eq!(
961 BigQuerySink::get_string_and_check_support_from_datatype(&rw_datatype).unwrap(),
962 big_query_type_string
963 );
964 }
965
966 #[tokio::test]
967 async fn test_schema_check() {
968 let schema = Schema {
969 fields: vec![
970 Field::with_name(DataType::Int64, "v1"),
971 Field::with_name(DataType::Float64, "v2"),
972 Field::with_name(
973 DataType::List(Box::new(DataType::Struct(StructType::new(vec![
974 ("v1".to_owned(), DataType::List(Box::new(DataType::Int64))),
975 (
976 "v3".to_owned(),
977 DataType::Struct(StructType::new(vec![
978 ("v1".to_owned(), DataType::Int64),
979 ("v2".to_owned(), DataType::Int64),
980 ])),
981 ),
982 ])))),
983 "v3",
984 ),
985 ],
986 };
987 let fields = schema
988 .fields()
989 .iter()
990 .map(|f| (f.name.as_str(), &f.data_type));
991 let desc = build_protobuf_schema(fields, "t1".to_owned()).unwrap();
992 let pool = build_protobuf_descriptor_pool(&desc).unwrap();
993 let t1_message = pool.get_message_by_name("t1").unwrap();
994 assert_matches!(
995 t1_message.get_field_by_name("v1").unwrap().kind(),
996 prost_reflect::Kind::Int64
997 );
998 assert_matches!(
999 t1_message.get_field_by_name("v2").unwrap().kind(),
1000 prost_reflect::Kind::Double
1001 );
1002 assert_matches!(
1003 t1_message.get_field_by_name("v3").unwrap().kind(),
1004 prost_reflect::Kind::Message(_)
1005 );
1006
1007 let v3_message = pool.get_message_by_name("t1.Structv3").unwrap();
1008 assert_matches!(
1009 v3_message.get_field_by_name("v1").unwrap().kind(),
1010 prost_reflect::Kind::Int64
1011 );
1012 assert!(v3_message.get_field_by_name("v1").unwrap().is_list());
1013
1014 let v3_v3_message = pool.get_message_by_name("t1.Structv3.Structv3").unwrap();
1015 assert_matches!(
1016 v3_v3_message.get_field_by_name("v1").unwrap().kind(),
1017 prost_reflect::Kind::Int64
1018 );
1019 assert_matches!(
1020 v3_v3_message.get_field_by_name("v2").unwrap().kind(),
1021 prost_reflect::Kind::Int64
1022 );
1023 }
1024}