risingwave_pb/
connector_service.serde.rs

1use crate::connector_service::*;
2impl serde::Serialize for CdcMessage {
3    #[allow(deprecated)]
4    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
5    where
6        S: serde::Serializer,
7    {
8        use serde::ser::SerializeStruct;
9        let mut len = 0;
10        if !self.payload.is_empty() {
11            len += 1;
12        }
13        if !self.partition.is_empty() {
14            len += 1;
15        }
16        if !self.offset.is_empty() {
17            len += 1;
18        }
19        if !self.full_table_name.is_empty() {
20            len += 1;
21        }
22        if self.source_ts_ms != 0 {
23            len += 1;
24        }
25        if self.msg_type != 0 {
26            len += 1;
27        }
28        if !self.key.is_empty() {
29            len += 1;
30        }
31        let mut struct_ser = serializer.serialize_struct("connector_service.CdcMessage", len)?;
32        if !self.payload.is_empty() {
33            struct_ser.serialize_field("payload", &self.payload)?;
34        }
35        if !self.partition.is_empty() {
36            struct_ser.serialize_field("partition", &self.partition)?;
37        }
38        if !self.offset.is_empty() {
39            struct_ser.serialize_field("offset", &self.offset)?;
40        }
41        if !self.full_table_name.is_empty() {
42            struct_ser.serialize_field("fullTableName", &self.full_table_name)?;
43        }
44        if self.source_ts_ms != 0 {
45            #[allow(clippy::needless_borrow)]
46            #[allow(clippy::needless_borrows_for_generic_args)]
47            struct_ser.serialize_field("sourceTsMs", ToString::to_string(&self.source_ts_ms).as_str())?;
48        }
49        if self.msg_type != 0 {
50            let v = cdc_message::CdcMessageType::try_from(self.msg_type)
51                .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.msg_type)))?;
52            struct_ser.serialize_field("msgType", &v)?;
53        }
54        if !self.key.is_empty() {
55            struct_ser.serialize_field("key", &self.key)?;
56        }
57        struct_ser.end()
58    }
59}
60impl<'de> serde::Deserialize<'de> for CdcMessage {
61    #[allow(deprecated)]
62    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
63    where
64        D: serde::Deserializer<'de>,
65    {
66        const FIELDS: &[&str] = &[
67            "payload",
68            "partition",
69            "offset",
70            "full_table_name",
71            "fullTableName",
72            "source_ts_ms",
73            "sourceTsMs",
74            "msg_type",
75            "msgType",
76            "key",
77        ];
78
79        #[allow(clippy::enum_variant_names)]
80        enum GeneratedField {
81            Payload,
82            Partition,
83            Offset,
84            FullTableName,
85            SourceTsMs,
86            MsgType,
87            Key,
88        }
89        impl<'de> serde::Deserialize<'de> for GeneratedField {
90            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
91            where
92                D: serde::Deserializer<'de>,
93            {
94                struct GeneratedVisitor;
95
96                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
97                    type Value = GeneratedField;
98
99                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100                        write!(formatter, "expected one of: {:?}", &FIELDS)
101                    }
102
103                    #[allow(unused_variables)]
104                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
105                    where
106                        E: serde::de::Error,
107                    {
108                        match value {
109                            "payload" => Ok(GeneratedField::Payload),
110                            "partition" => Ok(GeneratedField::Partition),
111                            "offset" => Ok(GeneratedField::Offset),
112                            "fullTableName" | "full_table_name" => Ok(GeneratedField::FullTableName),
113                            "sourceTsMs" | "source_ts_ms" => Ok(GeneratedField::SourceTsMs),
114                            "msgType" | "msg_type" => Ok(GeneratedField::MsgType),
115                            "key" => Ok(GeneratedField::Key),
116                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
117                        }
118                    }
119                }
120                deserializer.deserialize_identifier(GeneratedVisitor)
121            }
122        }
123        struct GeneratedVisitor;
124        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
125            type Value = CdcMessage;
126
127            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128                formatter.write_str("struct connector_service.CdcMessage")
129            }
130
131            fn visit_map<V>(self, mut map_: V) -> std::result::Result<CdcMessage, V::Error>
132                where
133                    V: serde::de::MapAccess<'de>,
134            {
135                let mut payload__ = None;
136                let mut partition__ = None;
137                let mut offset__ = None;
138                let mut full_table_name__ = None;
139                let mut source_ts_ms__ = None;
140                let mut msg_type__ = None;
141                let mut key__ = None;
142                while let Some(k) = map_.next_key()? {
143                    match k {
144                        GeneratedField::Payload => {
145                            if payload__.is_some() {
146                                return Err(serde::de::Error::duplicate_field("payload"));
147                            }
148                            payload__ = Some(map_.next_value()?);
149                        }
150                        GeneratedField::Partition => {
151                            if partition__.is_some() {
152                                return Err(serde::de::Error::duplicate_field("partition"));
153                            }
154                            partition__ = Some(map_.next_value()?);
155                        }
156                        GeneratedField::Offset => {
157                            if offset__.is_some() {
158                                return Err(serde::de::Error::duplicate_field("offset"));
159                            }
160                            offset__ = Some(map_.next_value()?);
161                        }
162                        GeneratedField::FullTableName => {
163                            if full_table_name__.is_some() {
164                                return Err(serde::de::Error::duplicate_field("fullTableName"));
165                            }
166                            full_table_name__ = Some(map_.next_value()?);
167                        }
168                        GeneratedField::SourceTsMs => {
169                            if source_ts_ms__.is_some() {
170                                return Err(serde::de::Error::duplicate_field("sourceTsMs"));
171                            }
172                            source_ts_ms__ = 
173                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
174                            ;
175                        }
176                        GeneratedField::MsgType => {
177                            if msg_type__.is_some() {
178                                return Err(serde::de::Error::duplicate_field("msgType"));
179                            }
180                            msg_type__ = Some(map_.next_value::<cdc_message::CdcMessageType>()? as i32);
181                        }
182                        GeneratedField::Key => {
183                            if key__.is_some() {
184                                return Err(serde::de::Error::duplicate_field("key"));
185                            }
186                            key__ = Some(map_.next_value()?);
187                        }
188                    }
189                }
190                Ok(CdcMessage {
191                    payload: payload__.unwrap_or_default(),
192                    partition: partition__.unwrap_or_default(),
193                    offset: offset__.unwrap_or_default(),
194                    full_table_name: full_table_name__.unwrap_or_default(),
195                    source_ts_ms: source_ts_ms__.unwrap_or_default(),
196                    msg_type: msg_type__.unwrap_or_default(),
197                    key: key__.unwrap_or_default(),
198                })
199            }
200        }
201        deserializer.deserialize_struct("connector_service.CdcMessage", FIELDS, GeneratedVisitor)
202    }
203}
204impl serde::Serialize for cdc_message::CdcMessageType {
205    #[allow(deprecated)]
206    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
207    where
208        S: serde::Serializer,
209    {
210        let variant = match self {
211            Self::Unspecified => "UNSPECIFIED",
212            Self::Heartbeat => "HEARTBEAT",
213            Self::Data => "DATA",
214            Self::TransactionMeta => "TRANSACTION_META",
215            Self::SchemaChange => "SCHEMA_CHANGE",
216        };
217        serializer.serialize_str(variant)
218    }
219}
220impl<'de> serde::Deserialize<'de> for cdc_message::CdcMessageType {
221    #[allow(deprecated)]
222    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
223    where
224        D: serde::Deserializer<'de>,
225    {
226        const FIELDS: &[&str] = &[
227            "UNSPECIFIED",
228            "HEARTBEAT",
229            "DATA",
230            "TRANSACTION_META",
231            "SCHEMA_CHANGE",
232        ];
233
234        struct GeneratedVisitor;
235
236        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
237            type Value = cdc_message::CdcMessageType;
238
239            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
240                write!(formatter, "expected one of: {:?}", &FIELDS)
241            }
242
243            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
244            where
245                E: serde::de::Error,
246            {
247                i32::try_from(v)
248                    .ok()
249                    .and_then(|x| x.try_into().ok())
250                    .ok_or_else(|| {
251                        serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
252                    })
253            }
254
255            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
256            where
257                E: serde::de::Error,
258            {
259                i32::try_from(v)
260                    .ok()
261                    .and_then(|x| x.try_into().ok())
262                    .ok_or_else(|| {
263                        serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
264                    })
265            }
266
267            fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
268            where
269                E: serde::de::Error,
270            {
271                match value {
272                    "UNSPECIFIED" => Ok(cdc_message::CdcMessageType::Unspecified),
273                    "HEARTBEAT" => Ok(cdc_message::CdcMessageType::Heartbeat),
274                    "DATA" => Ok(cdc_message::CdcMessageType::Data),
275                    "TRANSACTION_META" => Ok(cdc_message::CdcMessageType::TransactionMeta),
276                    "SCHEMA_CHANGE" => Ok(cdc_message::CdcMessageType::SchemaChange),
277                    _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
278                }
279            }
280        }
281        deserializer.deserialize_any(GeneratedVisitor)
282    }
283}
284impl serde::Serialize for CoordinateRequest {
285    #[allow(deprecated)]
286    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
287    where
288        S: serde::Serializer,
289    {
290        use serde::ser::SerializeStruct;
291        let mut len = 0;
292        if self.msg.is_some() {
293            len += 1;
294        }
295        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateRequest", len)?;
296        if let Some(v) = self.msg.as_ref() {
297            match v {
298                coordinate_request::Msg::StartRequest(v) => {
299                    struct_ser.serialize_field("startRequest", v)?;
300                }
301                coordinate_request::Msg::CommitRequest(v) => {
302                    struct_ser.serialize_field("commitRequest", v)?;
303                }
304                coordinate_request::Msg::UpdateVnodeRequest(v) => {
305                    struct_ser.serialize_field("updateVnodeRequest", v)?;
306                }
307                coordinate_request::Msg::Stop(v) => {
308                    struct_ser.serialize_field("stop", v)?;
309                }
310                coordinate_request::Msg::AlignInitialEpochRequest(v) => {
311                    #[allow(clippy::needless_borrow)]
312                    #[allow(clippy::needless_borrows_for_generic_args)]
313                    struct_ser.serialize_field("alignInitialEpochRequest", ToString::to_string(&v).as_str())?;
314                }
315            }
316        }
317        struct_ser.end()
318    }
319}
320impl<'de> serde::Deserialize<'de> for CoordinateRequest {
321    #[allow(deprecated)]
322    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
323    where
324        D: serde::Deserializer<'de>,
325    {
326        const FIELDS: &[&str] = &[
327            "start_request",
328            "startRequest",
329            "commit_request",
330            "commitRequest",
331            "update_vnode_request",
332            "updateVnodeRequest",
333            "stop",
334            "align_initial_epoch_request",
335            "alignInitialEpochRequest",
336        ];
337
338        #[allow(clippy::enum_variant_names)]
339        enum GeneratedField {
340            StartRequest,
341            CommitRequest,
342            UpdateVnodeRequest,
343            Stop,
344            AlignInitialEpochRequest,
345        }
346        impl<'de> serde::Deserialize<'de> for GeneratedField {
347            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
348            where
349                D: serde::Deserializer<'de>,
350            {
351                struct GeneratedVisitor;
352
353                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
354                    type Value = GeneratedField;
355
356                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
357                        write!(formatter, "expected one of: {:?}", &FIELDS)
358                    }
359
360                    #[allow(unused_variables)]
361                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
362                    where
363                        E: serde::de::Error,
364                    {
365                        match value {
366                            "startRequest" | "start_request" => Ok(GeneratedField::StartRequest),
367                            "commitRequest" | "commit_request" => Ok(GeneratedField::CommitRequest),
368                            "updateVnodeRequest" | "update_vnode_request" => Ok(GeneratedField::UpdateVnodeRequest),
369                            "stop" => Ok(GeneratedField::Stop),
370                            "alignInitialEpochRequest" | "align_initial_epoch_request" => Ok(GeneratedField::AlignInitialEpochRequest),
371                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
372                        }
373                    }
374                }
375                deserializer.deserialize_identifier(GeneratedVisitor)
376            }
377        }
378        struct GeneratedVisitor;
379        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
380            type Value = CoordinateRequest;
381
382            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
383                formatter.write_str("struct connector_service.CoordinateRequest")
384            }
385
386            fn visit_map<V>(self, mut map_: V) -> std::result::Result<CoordinateRequest, V::Error>
387                where
388                    V: serde::de::MapAccess<'de>,
389            {
390                let mut msg__ = None;
391                while let Some(k) = map_.next_key()? {
392                    match k {
393                        GeneratedField::StartRequest => {
394                            if msg__.is_some() {
395                                return Err(serde::de::Error::duplicate_field("startRequest"));
396                            }
397                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_request::Msg::StartRequest)
398;
399                        }
400                        GeneratedField::CommitRequest => {
401                            if msg__.is_some() {
402                                return Err(serde::de::Error::duplicate_field("commitRequest"));
403                            }
404                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_request::Msg::CommitRequest)
405;
406                        }
407                        GeneratedField::UpdateVnodeRequest => {
408                            if msg__.is_some() {
409                                return Err(serde::de::Error::duplicate_field("updateVnodeRequest"));
410                            }
411                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_request::Msg::UpdateVnodeRequest)
412;
413                        }
414                        GeneratedField::Stop => {
415                            if msg__.is_some() {
416                                return Err(serde::de::Error::duplicate_field("stop"));
417                            }
418                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_request::Msg::Stop);
419                        }
420                        GeneratedField::AlignInitialEpochRequest => {
421                            if msg__.is_some() {
422                                return Err(serde::de::Error::duplicate_field("alignInitialEpochRequest"));
423                            }
424                            msg__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| coordinate_request::Msg::AlignInitialEpochRequest(x.0));
425                        }
426                    }
427                }
428                Ok(CoordinateRequest {
429                    msg: msg__,
430                })
431            }
432        }
433        deserializer.deserialize_struct("connector_service.CoordinateRequest", FIELDS, GeneratedVisitor)
434    }
435}
436impl serde::Serialize for coordinate_request::CommitRequest {
437    #[allow(deprecated)]
438    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
439    where
440        S: serde::Serializer,
441    {
442        use serde::ser::SerializeStruct;
443        let mut len = 0;
444        if self.epoch != 0 {
445            len += 1;
446        }
447        if self.metadata.is_some() {
448            len += 1;
449        }
450        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateRequest.CommitRequest", len)?;
451        if self.epoch != 0 {
452            #[allow(clippy::needless_borrow)]
453            #[allow(clippy::needless_borrows_for_generic_args)]
454            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
455        }
456        if let Some(v) = self.metadata.as_ref() {
457            struct_ser.serialize_field("metadata", v)?;
458        }
459        struct_ser.end()
460    }
461}
462impl<'de> serde::Deserialize<'de> for coordinate_request::CommitRequest {
463    #[allow(deprecated)]
464    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
465    where
466        D: serde::Deserializer<'de>,
467    {
468        const FIELDS: &[&str] = &[
469            "epoch",
470            "metadata",
471        ];
472
473        #[allow(clippy::enum_variant_names)]
474        enum GeneratedField {
475            Epoch,
476            Metadata,
477        }
478        impl<'de> serde::Deserialize<'de> for GeneratedField {
479            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
480            where
481                D: serde::Deserializer<'de>,
482            {
483                struct GeneratedVisitor;
484
485                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
486                    type Value = GeneratedField;
487
488                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
489                        write!(formatter, "expected one of: {:?}", &FIELDS)
490                    }
491
492                    #[allow(unused_variables)]
493                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
494                    where
495                        E: serde::de::Error,
496                    {
497                        match value {
498                            "epoch" => Ok(GeneratedField::Epoch),
499                            "metadata" => Ok(GeneratedField::Metadata),
500                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
501                        }
502                    }
503                }
504                deserializer.deserialize_identifier(GeneratedVisitor)
505            }
506        }
507        struct GeneratedVisitor;
508        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
509            type Value = coordinate_request::CommitRequest;
510
511            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512                formatter.write_str("struct connector_service.CoordinateRequest.CommitRequest")
513            }
514
515            fn visit_map<V>(self, mut map_: V) -> std::result::Result<coordinate_request::CommitRequest, V::Error>
516                where
517                    V: serde::de::MapAccess<'de>,
518            {
519                let mut epoch__ = None;
520                let mut metadata__ = None;
521                while let Some(k) = map_.next_key()? {
522                    match k {
523                        GeneratedField::Epoch => {
524                            if epoch__.is_some() {
525                                return Err(serde::de::Error::duplicate_field("epoch"));
526                            }
527                            epoch__ = 
528                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
529                            ;
530                        }
531                        GeneratedField::Metadata => {
532                            if metadata__.is_some() {
533                                return Err(serde::de::Error::duplicate_field("metadata"));
534                            }
535                            metadata__ = map_.next_value()?;
536                        }
537                    }
538                }
539                Ok(coordinate_request::CommitRequest {
540                    epoch: epoch__.unwrap_or_default(),
541                    metadata: metadata__,
542                })
543            }
544        }
545        deserializer.deserialize_struct("connector_service.CoordinateRequest.CommitRequest", FIELDS, GeneratedVisitor)
546    }
547}
548impl serde::Serialize for coordinate_request::StartCoordinationRequest {
549    #[allow(deprecated)]
550    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
551    where
552        S: serde::Serializer,
553    {
554        use serde::ser::SerializeStruct;
555        let mut len = 0;
556        if self.vnode_bitmap.is_some() {
557            len += 1;
558        }
559        if self.param.is_some() {
560            len += 1;
561        }
562        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateRequest.StartCoordinationRequest", len)?;
563        if let Some(v) = self.vnode_bitmap.as_ref() {
564            struct_ser.serialize_field("vnodeBitmap", v)?;
565        }
566        if let Some(v) = self.param.as_ref() {
567            struct_ser.serialize_field("param", v)?;
568        }
569        struct_ser.end()
570    }
571}
572impl<'de> serde::Deserialize<'de> for coordinate_request::StartCoordinationRequest {
573    #[allow(deprecated)]
574    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
575    where
576        D: serde::Deserializer<'de>,
577    {
578        const FIELDS: &[&str] = &[
579            "vnode_bitmap",
580            "vnodeBitmap",
581            "param",
582        ];
583
584        #[allow(clippy::enum_variant_names)]
585        enum GeneratedField {
586            VnodeBitmap,
587            Param,
588        }
589        impl<'de> serde::Deserialize<'de> for GeneratedField {
590            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
591            where
592                D: serde::Deserializer<'de>,
593            {
594                struct GeneratedVisitor;
595
596                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
597                    type Value = GeneratedField;
598
599                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
600                        write!(formatter, "expected one of: {:?}", &FIELDS)
601                    }
602
603                    #[allow(unused_variables)]
604                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
605                    where
606                        E: serde::de::Error,
607                    {
608                        match value {
609                            "vnodeBitmap" | "vnode_bitmap" => Ok(GeneratedField::VnodeBitmap),
610                            "param" => Ok(GeneratedField::Param),
611                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
612                        }
613                    }
614                }
615                deserializer.deserialize_identifier(GeneratedVisitor)
616            }
617        }
618        struct GeneratedVisitor;
619        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
620            type Value = coordinate_request::StartCoordinationRequest;
621
622            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
623                formatter.write_str("struct connector_service.CoordinateRequest.StartCoordinationRequest")
624            }
625
626            fn visit_map<V>(self, mut map_: V) -> std::result::Result<coordinate_request::StartCoordinationRequest, V::Error>
627                where
628                    V: serde::de::MapAccess<'de>,
629            {
630                let mut vnode_bitmap__ = None;
631                let mut param__ = None;
632                while let Some(k) = map_.next_key()? {
633                    match k {
634                        GeneratedField::VnodeBitmap => {
635                            if vnode_bitmap__.is_some() {
636                                return Err(serde::de::Error::duplicate_field("vnodeBitmap"));
637                            }
638                            vnode_bitmap__ = map_.next_value()?;
639                        }
640                        GeneratedField::Param => {
641                            if param__.is_some() {
642                                return Err(serde::de::Error::duplicate_field("param"));
643                            }
644                            param__ = map_.next_value()?;
645                        }
646                    }
647                }
648                Ok(coordinate_request::StartCoordinationRequest {
649                    vnode_bitmap: vnode_bitmap__,
650                    param: param__,
651                })
652            }
653        }
654        deserializer.deserialize_struct("connector_service.CoordinateRequest.StartCoordinationRequest", FIELDS, GeneratedVisitor)
655    }
656}
657impl serde::Serialize for coordinate_request::UpdateVnodeBitmapRequest {
658    #[allow(deprecated)]
659    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
660    where
661        S: serde::Serializer,
662    {
663        use serde::ser::SerializeStruct;
664        let mut len = 0;
665        if self.vnode_bitmap.is_some() {
666            len += 1;
667        }
668        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateRequest.UpdateVnodeBitmapRequest", len)?;
669        if let Some(v) = self.vnode_bitmap.as_ref() {
670            struct_ser.serialize_field("vnodeBitmap", v)?;
671        }
672        struct_ser.end()
673    }
674}
675impl<'de> serde::Deserialize<'de> for coordinate_request::UpdateVnodeBitmapRequest {
676    #[allow(deprecated)]
677    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
678    where
679        D: serde::Deserializer<'de>,
680    {
681        const FIELDS: &[&str] = &[
682            "vnode_bitmap",
683            "vnodeBitmap",
684        ];
685
686        #[allow(clippy::enum_variant_names)]
687        enum GeneratedField {
688            VnodeBitmap,
689        }
690        impl<'de> serde::Deserialize<'de> for GeneratedField {
691            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
692            where
693                D: serde::Deserializer<'de>,
694            {
695                struct GeneratedVisitor;
696
697                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
698                    type Value = GeneratedField;
699
700                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
701                        write!(formatter, "expected one of: {:?}", &FIELDS)
702                    }
703
704                    #[allow(unused_variables)]
705                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
706                    where
707                        E: serde::de::Error,
708                    {
709                        match value {
710                            "vnodeBitmap" | "vnode_bitmap" => Ok(GeneratedField::VnodeBitmap),
711                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
712                        }
713                    }
714                }
715                deserializer.deserialize_identifier(GeneratedVisitor)
716            }
717        }
718        struct GeneratedVisitor;
719        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
720            type Value = coordinate_request::UpdateVnodeBitmapRequest;
721
722            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723                formatter.write_str("struct connector_service.CoordinateRequest.UpdateVnodeBitmapRequest")
724            }
725
726            fn visit_map<V>(self, mut map_: V) -> std::result::Result<coordinate_request::UpdateVnodeBitmapRequest, V::Error>
727                where
728                    V: serde::de::MapAccess<'de>,
729            {
730                let mut vnode_bitmap__ = None;
731                while let Some(k) = map_.next_key()? {
732                    match k {
733                        GeneratedField::VnodeBitmap => {
734                            if vnode_bitmap__.is_some() {
735                                return Err(serde::de::Error::duplicate_field("vnodeBitmap"));
736                            }
737                            vnode_bitmap__ = map_.next_value()?;
738                        }
739                    }
740                }
741                Ok(coordinate_request::UpdateVnodeBitmapRequest {
742                    vnode_bitmap: vnode_bitmap__,
743                })
744            }
745        }
746        deserializer.deserialize_struct("connector_service.CoordinateRequest.UpdateVnodeBitmapRequest", FIELDS, GeneratedVisitor)
747    }
748}
749impl serde::Serialize for CoordinateResponse {
750    #[allow(deprecated)]
751    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
752    where
753        S: serde::Serializer,
754    {
755        use serde::ser::SerializeStruct;
756        let mut len = 0;
757        if self.msg.is_some() {
758            len += 1;
759        }
760        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateResponse", len)?;
761        if let Some(v) = self.msg.as_ref() {
762            match v {
763                coordinate_response::Msg::StartResponse(v) => {
764                    struct_ser.serialize_field("startResponse", v)?;
765                }
766                coordinate_response::Msg::CommitResponse(v) => {
767                    struct_ser.serialize_field("commitResponse", v)?;
768                }
769                coordinate_response::Msg::Stopped(v) => {
770                    struct_ser.serialize_field("stopped", v)?;
771                }
772                coordinate_response::Msg::AlignInitialEpochResponse(v) => {
773                    #[allow(clippy::needless_borrow)]
774                    #[allow(clippy::needless_borrows_for_generic_args)]
775                    struct_ser.serialize_field("alignInitialEpochResponse", ToString::to_string(&v).as_str())?;
776                }
777            }
778        }
779        struct_ser.end()
780    }
781}
782impl<'de> serde::Deserialize<'de> for CoordinateResponse {
783    #[allow(deprecated)]
784    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
785    where
786        D: serde::Deserializer<'de>,
787    {
788        const FIELDS: &[&str] = &[
789            "start_response",
790            "startResponse",
791            "commit_response",
792            "commitResponse",
793            "stopped",
794            "align_initial_epoch_response",
795            "alignInitialEpochResponse",
796        ];
797
798        #[allow(clippy::enum_variant_names)]
799        enum GeneratedField {
800            StartResponse,
801            CommitResponse,
802            Stopped,
803            AlignInitialEpochResponse,
804        }
805        impl<'de> serde::Deserialize<'de> for GeneratedField {
806            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
807            where
808                D: serde::Deserializer<'de>,
809            {
810                struct GeneratedVisitor;
811
812                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
813                    type Value = GeneratedField;
814
815                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
816                        write!(formatter, "expected one of: {:?}", &FIELDS)
817                    }
818
819                    #[allow(unused_variables)]
820                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
821                    where
822                        E: serde::de::Error,
823                    {
824                        match value {
825                            "startResponse" | "start_response" => Ok(GeneratedField::StartResponse),
826                            "commitResponse" | "commit_response" => Ok(GeneratedField::CommitResponse),
827                            "stopped" => Ok(GeneratedField::Stopped),
828                            "alignInitialEpochResponse" | "align_initial_epoch_response" => Ok(GeneratedField::AlignInitialEpochResponse),
829                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
830                        }
831                    }
832                }
833                deserializer.deserialize_identifier(GeneratedVisitor)
834            }
835        }
836        struct GeneratedVisitor;
837        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
838            type Value = CoordinateResponse;
839
840            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
841                formatter.write_str("struct connector_service.CoordinateResponse")
842            }
843
844            fn visit_map<V>(self, mut map_: V) -> std::result::Result<CoordinateResponse, V::Error>
845                where
846                    V: serde::de::MapAccess<'de>,
847            {
848                let mut msg__ = None;
849                while let Some(k) = map_.next_key()? {
850                    match k {
851                        GeneratedField::StartResponse => {
852                            if msg__.is_some() {
853                                return Err(serde::de::Error::duplicate_field("startResponse"));
854                            }
855                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_response::Msg::StartResponse)
856;
857                        }
858                        GeneratedField::CommitResponse => {
859                            if msg__.is_some() {
860                                return Err(serde::de::Error::duplicate_field("commitResponse"));
861                            }
862                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_response::Msg::CommitResponse)
863;
864                        }
865                        GeneratedField::Stopped => {
866                            if msg__.is_some() {
867                                return Err(serde::de::Error::duplicate_field("stopped"));
868                            }
869                            msg__ = map_.next_value::<::std::option::Option<_>>()?.map(coordinate_response::Msg::Stopped);
870                        }
871                        GeneratedField::AlignInitialEpochResponse => {
872                            if msg__.is_some() {
873                                return Err(serde::de::Error::duplicate_field("alignInitialEpochResponse"));
874                            }
875                            msg__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| coordinate_response::Msg::AlignInitialEpochResponse(x.0));
876                        }
877                    }
878                }
879                Ok(CoordinateResponse {
880                    msg: msg__,
881                })
882            }
883        }
884        deserializer.deserialize_struct("connector_service.CoordinateResponse", FIELDS, GeneratedVisitor)
885    }
886}
887impl serde::Serialize for coordinate_response::CommitResponse {
888    #[allow(deprecated)]
889    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
890    where
891        S: serde::Serializer,
892    {
893        use serde::ser::SerializeStruct;
894        let mut len = 0;
895        if self.epoch != 0 {
896            len += 1;
897        }
898        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateResponse.CommitResponse", len)?;
899        if self.epoch != 0 {
900            #[allow(clippy::needless_borrow)]
901            #[allow(clippy::needless_borrows_for_generic_args)]
902            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
903        }
904        struct_ser.end()
905    }
906}
907impl<'de> serde::Deserialize<'de> for coordinate_response::CommitResponse {
908    #[allow(deprecated)]
909    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
910    where
911        D: serde::Deserializer<'de>,
912    {
913        const FIELDS: &[&str] = &[
914            "epoch",
915        ];
916
917        #[allow(clippy::enum_variant_names)]
918        enum GeneratedField {
919            Epoch,
920        }
921        impl<'de> serde::Deserialize<'de> for GeneratedField {
922            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
923            where
924                D: serde::Deserializer<'de>,
925            {
926                struct GeneratedVisitor;
927
928                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
929                    type Value = GeneratedField;
930
931                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
932                        write!(formatter, "expected one of: {:?}", &FIELDS)
933                    }
934
935                    #[allow(unused_variables)]
936                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
937                    where
938                        E: serde::de::Error,
939                    {
940                        match value {
941                            "epoch" => Ok(GeneratedField::Epoch),
942                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
943                        }
944                    }
945                }
946                deserializer.deserialize_identifier(GeneratedVisitor)
947            }
948        }
949        struct GeneratedVisitor;
950        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
951            type Value = coordinate_response::CommitResponse;
952
953            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
954                formatter.write_str("struct connector_service.CoordinateResponse.CommitResponse")
955            }
956
957            fn visit_map<V>(self, mut map_: V) -> std::result::Result<coordinate_response::CommitResponse, V::Error>
958                where
959                    V: serde::de::MapAccess<'de>,
960            {
961                let mut epoch__ = None;
962                while let Some(k) = map_.next_key()? {
963                    match k {
964                        GeneratedField::Epoch => {
965                            if epoch__.is_some() {
966                                return Err(serde::de::Error::duplicate_field("epoch"));
967                            }
968                            epoch__ = 
969                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
970                            ;
971                        }
972                    }
973                }
974                Ok(coordinate_response::CommitResponse {
975                    epoch: epoch__.unwrap_or_default(),
976                })
977            }
978        }
979        deserializer.deserialize_struct("connector_service.CoordinateResponse.CommitResponse", FIELDS, GeneratedVisitor)
980    }
981}
982impl serde::Serialize for coordinate_response::StartCoordinationResponse {
983    #[allow(deprecated)]
984    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
985    where
986        S: serde::Serializer,
987    {
988        use serde::ser::SerializeStruct;
989        let mut len = 0;
990        if self.log_store_rewind_start_epoch.is_some() {
991            len += 1;
992        }
993        let mut struct_ser = serializer.serialize_struct("connector_service.CoordinateResponse.StartCoordinationResponse", len)?;
994        if let Some(v) = self.log_store_rewind_start_epoch.as_ref() {
995            #[allow(clippy::needless_borrow)]
996            #[allow(clippy::needless_borrows_for_generic_args)]
997            struct_ser.serialize_field("logStoreRewindStartEpoch", ToString::to_string(&v).as_str())?;
998        }
999        struct_ser.end()
1000    }
1001}
1002impl<'de> serde::Deserialize<'de> for coordinate_response::StartCoordinationResponse {
1003    #[allow(deprecated)]
1004    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1005    where
1006        D: serde::Deserializer<'de>,
1007    {
1008        const FIELDS: &[&str] = &[
1009            "log_store_rewind_start_epoch",
1010            "logStoreRewindStartEpoch",
1011        ];
1012
1013        #[allow(clippy::enum_variant_names)]
1014        enum GeneratedField {
1015            LogStoreRewindStartEpoch,
1016        }
1017        impl<'de> serde::Deserialize<'de> for GeneratedField {
1018            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1019            where
1020                D: serde::Deserializer<'de>,
1021            {
1022                struct GeneratedVisitor;
1023
1024                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1025                    type Value = GeneratedField;
1026
1027                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1028                        write!(formatter, "expected one of: {:?}", &FIELDS)
1029                    }
1030
1031                    #[allow(unused_variables)]
1032                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1033                    where
1034                        E: serde::de::Error,
1035                    {
1036                        match value {
1037                            "logStoreRewindStartEpoch" | "log_store_rewind_start_epoch" => Ok(GeneratedField::LogStoreRewindStartEpoch),
1038                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1039                        }
1040                    }
1041                }
1042                deserializer.deserialize_identifier(GeneratedVisitor)
1043            }
1044        }
1045        struct GeneratedVisitor;
1046        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1047            type Value = coordinate_response::StartCoordinationResponse;
1048
1049            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1050                formatter.write_str("struct connector_service.CoordinateResponse.StartCoordinationResponse")
1051            }
1052
1053            fn visit_map<V>(self, mut map_: V) -> std::result::Result<coordinate_response::StartCoordinationResponse, V::Error>
1054                where
1055                    V: serde::de::MapAccess<'de>,
1056            {
1057                let mut log_store_rewind_start_epoch__ = None;
1058                while let Some(k) = map_.next_key()? {
1059                    match k {
1060                        GeneratedField::LogStoreRewindStartEpoch => {
1061                            if log_store_rewind_start_epoch__.is_some() {
1062                                return Err(serde::de::Error::duplicate_field("logStoreRewindStartEpoch"));
1063                            }
1064                            log_store_rewind_start_epoch__ = 
1065                                map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0)
1066                            ;
1067                        }
1068                    }
1069                }
1070                Ok(coordinate_response::StartCoordinationResponse {
1071                    log_store_rewind_start_epoch: log_store_rewind_start_epoch__,
1072                })
1073            }
1074        }
1075        deserializer.deserialize_struct("connector_service.CoordinateResponse.StartCoordinationResponse", FIELDS, GeneratedVisitor)
1076    }
1077}
1078impl serde::Serialize for GetEventStreamRequest {
1079    #[allow(deprecated)]
1080    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1081    where
1082        S: serde::Serializer,
1083    {
1084        use serde::ser::SerializeStruct;
1085        let mut len = 0;
1086        if self.source_id != 0 {
1087            len += 1;
1088        }
1089        if self.source_type != 0 {
1090            len += 1;
1091        }
1092        if !self.start_offset.is_empty() {
1093            len += 1;
1094        }
1095        if !self.properties.is_empty() {
1096            len += 1;
1097        }
1098        if self.snapshot_done {
1099            len += 1;
1100        }
1101        if self.is_source_job {
1102            len += 1;
1103        }
1104        let mut struct_ser = serializer.serialize_struct("connector_service.GetEventStreamRequest", len)?;
1105        if self.source_id != 0 {
1106            #[allow(clippy::needless_borrow)]
1107            #[allow(clippy::needless_borrows_for_generic_args)]
1108            struct_ser.serialize_field("sourceId", ToString::to_string(&self.source_id).as_str())?;
1109        }
1110        if self.source_type != 0 {
1111            let v = SourceType::try_from(self.source_type)
1112                .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.source_type)))?;
1113            struct_ser.serialize_field("sourceType", &v)?;
1114        }
1115        if !self.start_offset.is_empty() {
1116            struct_ser.serialize_field("startOffset", &self.start_offset)?;
1117        }
1118        if !self.properties.is_empty() {
1119            struct_ser.serialize_field("properties", &self.properties)?;
1120        }
1121        if self.snapshot_done {
1122            struct_ser.serialize_field("snapshotDone", &self.snapshot_done)?;
1123        }
1124        if self.is_source_job {
1125            struct_ser.serialize_field("isSourceJob", &self.is_source_job)?;
1126        }
1127        struct_ser.end()
1128    }
1129}
1130impl<'de> serde::Deserialize<'de> for GetEventStreamRequest {
1131    #[allow(deprecated)]
1132    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1133    where
1134        D: serde::Deserializer<'de>,
1135    {
1136        const FIELDS: &[&str] = &[
1137            "source_id",
1138            "sourceId",
1139            "source_type",
1140            "sourceType",
1141            "start_offset",
1142            "startOffset",
1143            "properties",
1144            "snapshot_done",
1145            "snapshotDone",
1146            "is_source_job",
1147            "isSourceJob",
1148        ];
1149
1150        #[allow(clippy::enum_variant_names)]
1151        enum GeneratedField {
1152            SourceId,
1153            SourceType,
1154            StartOffset,
1155            Properties,
1156            SnapshotDone,
1157            IsSourceJob,
1158        }
1159        impl<'de> serde::Deserialize<'de> for GeneratedField {
1160            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1161            where
1162                D: serde::Deserializer<'de>,
1163            {
1164                struct GeneratedVisitor;
1165
1166                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1167                    type Value = GeneratedField;
1168
1169                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1170                        write!(formatter, "expected one of: {:?}", &FIELDS)
1171                    }
1172
1173                    #[allow(unused_variables)]
1174                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1175                    where
1176                        E: serde::de::Error,
1177                    {
1178                        match value {
1179                            "sourceId" | "source_id" => Ok(GeneratedField::SourceId),
1180                            "sourceType" | "source_type" => Ok(GeneratedField::SourceType),
1181                            "startOffset" | "start_offset" => Ok(GeneratedField::StartOffset),
1182                            "properties" => Ok(GeneratedField::Properties),
1183                            "snapshotDone" | "snapshot_done" => Ok(GeneratedField::SnapshotDone),
1184                            "isSourceJob" | "is_source_job" => Ok(GeneratedField::IsSourceJob),
1185                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1186                        }
1187                    }
1188                }
1189                deserializer.deserialize_identifier(GeneratedVisitor)
1190            }
1191        }
1192        struct GeneratedVisitor;
1193        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1194            type Value = GetEventStreamRequest;
1195
1196            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1197                formatter.write_str("struct connector_service.GetEventStreamRequest")
1198            }
1199
1200            fn visit_map<V>(self, mut map_: V) -> std::result::Result<GetEventStreamRequest, V::Error>
1201                where
1202                    V: serde::de::MapAccess<'de>,
1203            {
1204                let mut source_id__ = None;
1205                let mut source_type__ = None;
1206                let mut start_offset__ = None;
1207                let mut properties__ = None;
1208                let mut snapshot_done__ = None;
1209                let mut is_source_job__ = None;
1210                while let Some(k) = map_.next_key()? {
1211                    match k {
1212                        GeneratedField::SourceId => {
1213                            if source_id__.is_some() {
1214                                return Err(serde::de::Error::duplicate_field("sourceId"));
1215                            }
1216                            source_id__ = 
1217                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
1218                            ;
1219                        }
1220                        GeneratedField::SourceType => {
1221                            if source_type__.is_some() {
1222                                return Err(serde::de::Error::duplicate_field("sourceType"));
1223                            }
1224                            source_type__ = Some(map_.next_value::<SourceType>()? as i32);
1225                        }
1226                        GeneratedField::StartOffset => {
1227                            if start_offset__.is_some() {
1228                                return Err(serde::de::Error::duplicate_field("startOffset"));
1229                            }
1230                            start_offset__ = Some(map_.next_value()?);
1231                        }
1232                        GeneratedField::Properties => {
1233                            if properties__.is_some() {
1234                                return Err(serde::de::Error::duplicate_field("properties"));
1235                            }
1236                            properties__ = Some(
1237                                map_.next_value::<std::collections::BTreeMap<_, _>>()?
1238                            );
1239                        }
1240                        GeneratedField::SnapshotDone => {
1241                            if snapshot_done__.is_some() {
1242                                return Err(serde::de::Error::duplicate_field("snapshotDone"));
1243                            }
1244                            snapshot_done__ = Some(map_.next_value()?);
1245                        }
1246                        GeneratedField::IsSourceJob => {
1247                            if is_source_job__.is_some() {
1248                                return Err(serde::de::Error::duplicate_field("isSourceJob"));
1249                            }
1250                            is_source_job__ = Some(map_.next_value()?);
1251                        }
1252                    }
1253                }
1254                Ok(GetEventStreamRequest {
1255                    source_id: source_id__.unwrap_or_default(),
1256                    source_type: source_type__.unwrap_or_default(),
1257                    start_offset: start_offset__.unwrap_or_default(),
1258                    properties: properties__.unwrap_or_default(),
1259                    snapshot_done: snapshot_done__.unwrap_or_default(),
1260                    is_source_job: is_source_job__.unwrap_or_default(),
1261                })
1262            }
1263        }
1264        deserializer.deserialize_struct("connector_service.GetEventStreamRequest", FIELDS, GeneratedVisitor)
1265    }
1266}
1267impl serde::Serialize for GetEventStreamResponse {
1268    #[allow(deprecated)]
1269    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1270    where
1271        S: serde::Serializer,
1272    {
1273        use serde::ser::SerializeStruct;
1274        let mut len = 0;
1275        if self.source_id != 0 {
1276            len += 1;
1277        }
1278        if !self.events.is_empty() {
1279            len += 1;
1280        }
1281        if self.control.is_some() {
1282            len += 1;
1283        }
1284        let mut struct_ser = serializer.serialize_struct("connector_service.GetEventStreamResponse", len)?;
1285        if self.source_id != 0 {
1286            #[allow(clippy::needless_borrow)]
1287            #[allow(clippy::needless_borrows_for_generic_args)]
1288            struct_ser.serialize_field("sourceId", ToString::to_string(&self.source_id).as_str())?;
1289        }
1290        if !self.events.is_empty() {
1291            struct_ser.serialize_field("events", &self.events)?;
1292        }
1293        if let Some(v) = self.control.as_ref() {
1294            struct_ser.serialize_field("control", v)?;
1295        }
1296        struct_ser.end()
1297    }
1298}
1299impl<'de> serde::Deserialize<'de> for GetEventStreamResponse {
1300    #[allow(deprecated)]
1301    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1302    where
1303        D: serde::Deserializer<'de>,
1304    {
1305        const FIELDS: &[&str] = &[
1306            "source_id",
1307            "sourceId",
1308            "events",
1309            "control",
1310        ];
1311
1312        #[allow(clippy::enum_variant_names)]
1313        enum GeneratedField {
1314            SourceId,
1315            Events,
1316            Control,
1317        }
1318        impl<'de> serde::Deserialize<'de> for GeneratedField {
1319            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1320            where
1321                D: serde::Deserializer<'de>,
1322            {
1323                struct GeneratedVisitor;
1324
1325                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1326                    type Value = GeneratedField;
1327
1328                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1329                        write!(formatter, "expected one of: {:?}", &FIELDS)
1330                    }
1331
1332                    #[allow(unused_variables)]
1333                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1334                    where
1335                        E: serde::de::Error,
1336                    {
1337                        match value {
1338                            "sourceId" | "source_id" => Ok(GeneratedField::SourceId),
1339                            "events" => Ok(GeneratedField::Events),
1340                            "control" => Ok(GeneratedField::Control),
1341                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1342                        }
1343                    }
1344                }
1345                deserializer.deserialize_identifier(GeneratedVisitor)
1346            }
1347        }
1348        struct GeneratedVisitor;
1349        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1350            type Value = GetEventStreamResponse;
1351
1352            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1353                formatter.write_str("struct connector_service.GetEventStreamResponse")
1354            }
1355
1356            fn visit_map<V>(self, mut map_: V) -> std::result::Result<GetEventStreamResponse, V::Error>
1357                where
1358                    V: serde::de::MapAccess<'de>,
1359            {
1360                let mut source_id__ = None;
1361                let mut events__ = None;
1362                let mut control__ = None;
1363                while let Some(k) = map_.next_key()? {
1364                    match k {
1365                        GeneratedField::SourceId => {
1366                            if source_id__.is_some() {
1367                                return Err(serde::de::Error::duplicate_field("sourceId"));
1368                            }
1369                            source_id__ = 
1370                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
1371                            ;
1372                        }
1373                        GeneratedField::Events => {
1374                            if events__.is_some() {
1375                                return Err(serde::de::Error::duplicate_field("events"));
1376                            }
1377                            events__ = Some(map_.next_value()?);
1378                        }
1379                        GeneratedField::Control => {
1380                            if control__.is_some() {
1381                                return Err(serde::de::Error::duplicate_field("control"));
1382                            }
1383                            control__ = map_.next_value()?;
1384                        }
1385                    }
1386                }
1387                Ok(GetEventStreamResponse {
1388                    source_id: source_id__.unwrap_or_default(),
1389                    events: events__.unwrap_or_default(),
1390                    control: control__,
1391                })
1392            }
1393        }
1394        deserializer.deserialize_struct("connector_service.GetEventStreamResponse", FIELDS, GeneratedVisitor)
1395    }
1396}
1397impl serde::Serialize for get_event_stream_response::ControlInfo {
1398    #[allow(deprecated)]
1399    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1400    where
1401        S: serde::Serializer,
1402    {
1403        use serde::ser::SerializeStruct;
1404        let mut len = 0;
1405        if self.handshake_ok {
1406            len += 1;
1407        }
1408        let mut struct_ser = serializer.serialize_struct("connector_service.GetEventStreamResponse.ControlInfo", len)?;
1409        if self.handshake_ok {
1410            struct_ser.serialize_field("handshakeOk", &self.handshake_ok)?;
1411        }
1412        struct_ser.end()
1413    }
1414}
1415impl<'de> serde::Deserialize<'de> for get_event_stream_response::ControlInfo {
1416    #[allow(deprecated)]
1417    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1418    where
1419        D: serde::Deserializer<'de>,
1420    {
1421        const FIELDS: &[&str] = &[
1422            "handshake_ok",
1423            "handshakeOk",
1424        ];
1425
1426        #[allow(clippy::enum_variant_names)]
1427        enum GeneratedField {
1428            HandshakeOk,
1429        }
1430        impl<'de> serde::Deserialize<'de> for GeneratedField {
1431            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1432            where
1433                D: serde::Deserializer<'de>,
1434            {
1435                struct GeneratedVisitor;
1436
1437                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1438                    type Value = GeneratedField;
1439
1440                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1441                        write!(formatter, "expected one of: {:?}", &FIELDS)
1442                    }
1443
1444                    #[allow(unused_variables)]
1445                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1446                    where
1447                        E: serde::de::Error,
1448                    {
1449                        match value {
1450                            "handshakeOk" | "handshake_ok" => Ok(GeneratedField::HandshakeOk),
1451                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1452                        }
1453                    }
1454                }
1455                deserializer.deserialize_identifier(GeneratedVisitor)
1456            }
1457        }
1458        struct GeneratedVisitor;
1459        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1460            type Value = get_event_stream_response::ControlInfo;
1461
1462            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1463                formatter.write_str("struct connector_service.GetEventStreamResponse.ControlInfo")
1464            }
1465
1466            fn visit_map<V>(self, mut map_: V) -> std::result::Result<get_event_stream_response::ControlInfo, V::Error>
1467                where
1468                    V: serde::de::MapAccess<'de>,
1469            {
1470                let mut handshake_ok__ = None;
1471                while let Some(k) = map_.next_key()? {
1472                    match k {
1473                        GeneratedField::HandshakeOk => {
1474                            if handshake_ok__.is_some() {
1475                                return Err(serde::de::Error::duplicate_field("handshakeOk"));
1476                            }
1477                            handshake_ok__ = Some(map_.next_value()?);
1478                        }
1479                    }
1480                }
1481                Ok(get_event_stream_response::ControlInfo {
1482                    handshake_ok: handshake_ok__.unwrap_or_default(),
1483                })
1484            }
1485        }
1486        deserializer.deserialize_struct("connector_service.GetEventStreamResponse.ControlInfo", FIELDS, GeneratedVisitor)
1487    }
1488}
1489impl serde::Serialize for SinkCoordinatorStreamRequest {
1490    #[allow(deprecated)]
1491    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1492    where
1493        S: serde::Serializer,
1494    {
1495        use serde::ser::SerializeStruct;
1496        let mut len = 0;
1497        if self.request.is_some() {
1498            len += 1;
1499        }
1500        let mut struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamRequest", len)?;
1501        if let Some(v) = self.request.as_ref() {
1502            match v {
1503                sink_coordinator_stream_request::Request::Start(v) => {
1504                    struct_ser.serialize_field("start", v)?;
1505                }
1506                sink_coordinator_stream_request::Request::Commit(v) => {
1507                    struct_ser.serialize_field("commit", v)?;
1508                }
1509            }
1510        }
1511        struct_ser.end()
1512    }
1513}
1514impl<'de> serde::Deserialize<'de> for SinkCoordinatorStreamRequest {
1515    #[allow(deprecated)]
1516    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1517    where
1518        D: serde::Deserializer<'de>,
1519    {
1520        const FIELDS: &[&str] = &[
1521            "start",
1522            "commit",
1523        ];
1524
1525        #[allow(clippy::enum_variant_names)]
1526        enum GeneratedField {
1527            Start,
1528            Commit,
1529        }
1530        impl<'de> serde::Deserialize<'de> for GeneratedField {
1531            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1532            where
1533                D: serde::Deserializer<'de>,
1534            {
1535                struct GeneratedVisitor;
1536
1537                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1538                    type Value = GeneratedField;
1539
1540                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1541                        write!(formatter, "expected one of: {:?}", &FIELDS)
1542                    }
1543
1544                    #[allow(unused_variables)]
1545                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1546                    where
1547                        E: serde::de::Error,
1548                    {
1549                        match value {
1550                            "start" => Ok(GeneratedField::Start),
1551                            "commit" => Ok(GeneratedField::Commit),
1552                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1553                        }
1554                    }
1555                }
1556                deserializer.deserialize_identifier(GeneratedVisitor)
1557            }
1558        }
1559        struct GeneratedVisitor;
1560        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1561            type Value = SinkCoordinatorStreamRequest;
1562
1563            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1564                formatter.write_str("struct connector_service.SinkCoordinatorStreamRequest")
1565            }
1566
1567            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkCoordinatorStreamRequest, V::Error>
1568                where
1569                    V: serde::de::MapAccess<'de>,
1570            {
1571                let mut request__ = None;
1572                while let Some(k) = map_.next_key()? {
1573                    match k {
1574                        GeneratedField::Start => {
1575                            if request__.is_some() {
1576                                return Err(serde::de::Error::duplicate_field("start"));
1577                            }
1578                            request__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_coordinator_stream_request::Request::Start)
1579;
1580                        }
1581                        GeneratedField::Commit => {
1582                            if request__.is_some() {
1583                                return Err(serde::de::Error::duplicate_field("commit"));
1584                            }
1585                            request__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_coordinator_stream_request::Request::Commit)
1586;
1587                        }
1588                    }
1589                }
1590                Ok(SinkCoordinatorStreamRequest {
1591                    request: request__,
1592                })
1593            }
1594        }
1595        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamRequest", FIELDS, GeneratedVisitor)
1596    }
1597}
1598impl serde::Serialize for sink_coordinator_stream_request::CommitMetadata {
1599    #[allow(deprecated)]
1600    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1601    where
1602        S: serde::Serializer,
1603    {
1604        use serde::ser::SerializeStruct;
1605        let mut len = 0;
1606        if self.epoch != 0 {
1607            len += 1;
1608        }
1609        if !self.metadata.is_empty() {
1610            len += 1;
1611        }
1612        let mut struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamRequest.CommitMetadata", len)?;
1613        if self.epoch != 0 {
1614            #[allow(clippy::needless_borrow)]
1615            #[allow(clippy::needless_borrows_for_generic_args)]
1616            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
1617        }
1618        if !self.metadata.is_empty() {
1619            struct_ser.serialize_field("metadata", &self.metadata)?;
1620        }
1621        struct_ser.end()
1622    }
1623}
1624impl<'de> serde::Deserialize<'de> for sink_coordinator_stream_request::CommitMetadata {
1625    #[allow(deprecated)]
1626    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1627    where
1628        D: serde::Deserializer<'de>,
1629    {
1630        const FIELDS: &[&str] = &[
1631            "epoch",
1632            "metadata",
1633        ];
1634
1635        #[allow(clippy::enum_variant_names)]
1636        enum GeneratedField {
1637            Epoch,
1638            Metadata,
1639        }
1640        impl<'de> serde::Deserialize<'de> for GeneratedField {
1641            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1642            where
1643                D: serde::Deserializer<'de>,
1644            {
1645                struct GeneratedVisitor;
1646
1647                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1648                    type Value = GeneratedField;
1649
1650                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1651                        write!(formatter, "expected one of: {:?}", &FIELDS)
1652                    }
1653
1654                    #[allow(unused_variables)]
1655                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1656                    where
1657                        E: serde::de::Error,
1658                    {
1659                        match value {
1660                            "epoch" => Ok(GeneratedField::Epoch),
1661                            "metadata" => Ok(GeneratedField::Metadata),
1662                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1663                        }
1664                    }
1665                }
1666                deserializer.deserialize_identifier(GeneratedVisitor)
1667            }
1668        }
1669        struct GeneratedVisitor;
1670        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1671            type Value = sink_coordinator_stream_request::CommitMetadata;
1672
1673            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1674                formatter.write_str("struct connector_service.SinkCoordinatorStreamRequest.CommitMetadata")
1675            }
1676
1677            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_coordinator_stream_request::CommitMetadata, V::Error>
1678                where
1679                    V: serde::de::MapAccess<'de>,
1680            {
1681                let mut epoch__ = None;
1682                let mut metadata__ = None;
1683                while let Some(k) = map_.next_key()? {
1684                    match k {
1685                        GeneratedField::Epoch => {
1686                            if epoch__.is_some() {
1687                                return Err(serde::de::Error::duplicate_field("epoch"));
1688                            }
1689                            epoch__ = 
1690                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
1691                            ;
1692                        }
1693                        GeneratedField::Metadata => {
1694                            if metadata__.is_some() {
1695                                return Err(serde::de::Error::duplicate_field("metadata"));
1696                            }
1697                            metadata__ = Some(map_.next_value()?);
1698                        }
1699                    }
1700                }
1701                Ok(sink_coordinator_stream_request::CommitMetadata {
1702                    epoch: epoch__.unwrap_or_default(),
1703                    metadata: metadata__.unwrap_or_default(),
1704                })
1705            }
1706        }
1707        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamRequest.CommitMetadata", FIELDS, GeneratedVisitor)
1708    }
1709}
1710impl serde::Serialize for sink_coordinator_stream_request::StartCoordinator {
1711    #[allow(deprecated)]
1712    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1713    where
1714        S: serde::Serializer,
1715    {
1716        use serde::ser::SerializeStruct;
1717        let mut len = 0;
1718        if self.param.is_some() {
1719            len += 1;
1720        }
1721        let mut struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamRequest.StartCoordinator", len)?;
1722        if let Some(v) = self.param.as_ref() {
1723            struct_ser.serialize_field("param", v)?;
1724        }
1725        struct_ser.end()
1726    }
1727}
1728impl<'de> serde::Deserialize<'de> for sink_coordinator_stream_request::StartCoordinator {
1729    #[allow(deprecated)]
1730    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1731    where
1732        D: serde::Deserializer<'de>,
1733    {
1734        const FIELDS: &[&str] = &[
1735            "param",
1736        ];
1737
1738        #[allow(clippy::enum_variant_names)]
1739        enum GeneratedField {
1740            Param,
1741        }
1742        impl<'de> serde::Deserialize<'de> for GeneratedField {
1743            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1744            where
1745                D: serde::Deserializer<'de>,
1746            {
1747                struct GeneratedVisitor;
1748
1749                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1750                    type Value = GeneratedField;
1751
1752                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1753                        write!(formatter, "expected one of: {:?}", &FIELDS)
1754                    }
1755
1756                    #[allow(unused_variables)]
1757                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1758                    where
1759                        E: serde::de::Error,
1760                    {
1761                        match value {
1762                            "param" => Ok(GeneratedField::Param),
1763                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1764                        }
1765                    }
1766                }
1767                deserializer.deserialize_identifier(GeneratedVisitor)
1768            }
1769        }
1770        struct GeneratedVisitor;
1771        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1772            type Value = sink_coordinator_stream_request::StartCoordinator;
1773
1774            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1775                formatter.write_str("struct connector_service.SinkCoordinatorStreamRequest.StartCoordinator")
1776            }
1777
1778            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_coordinator_stream_request::StartCoordinator, V::Error>
1779                where
1780                    V: serde::de::MapAccess<'de>,
1781            {
1782                let mut param__ = None;
1783                while let Some(k) = map_.next_key()? {
1784                    match k {
1785                        GeneratedField::Param => {
1786                            if param__.is_some() {
1787                                return Err(serde::de::Error::duplicate_field("param"));
1788                            }
1789                            param__ = map_.next_value()?;
1790                        }
1791                    }
1792                }
1793                Ok(sink_coordinator_stream_request::StartCoordinator {
1794                    param: param__,
1795                })
1796            }
1797        }
1798        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamRequest.StartCoordinator", FIELDS, GeneratedVisitor)
1799    }
1800}
1801impl serde::Serialize for SinkCoordinatorStreamResponse {
1802    #[allow(deprecated)]
1803    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1804    where
1805        S: serde::Serializer,
1806    {
1807        use serde::ser::SerializeStruct;
1808        let mut len = 0;
1809        if self.response.is_some() {
1810            len += 1;
1811        }
1812        let mut struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamResponse", len)?;
1813        if let Some(v) = self.response.as_ref() {
1814            match v {
1815                sink_coordinator_stream_response::Response::Start(v) => {
1816                    struct_ser.serialize_field("start", v)?;
1817                }
1818                sink_coordinator_stream_response::Response::Commit(v) => {
1819                    struct_ser.serialize_field("commit", v)?;
1820                }
1821            }
1822        }
1823        struct_ser.end()
1824    }
1825}
1826impl<'de> serde::Deserialize<'de> for SinkCoordinatorStreamResponse {
1827    #[allow(deprecated)]
1828    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1829    where
1830        D: serde::Deserializer<'de>,
1831    {
1832        const FIELDS: &[&str] = &[
1833            "start",
1834            "commit",
1835        ];
1836
1837        #[allow(clippy::enum_variant_names)]
1838        enum GeneratedField {
1839            Start,
1840            Commit,
1841        }
1842        impl<'de> serde::Deserialize<'de> for GeneratedField {
1843            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1844            where
1845                D: serde::Deserializer<'de>,
1846            {
1847                struct GeneratedVisitor;
1848
1849                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1850                    type Value = GeneratedField;
1851
1852                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1853                        write!(formatter, "expected one of: {:?}", &FIELDS)
1854                    }
1855
1856                    #[allow(unused_variables)]
1857                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1858                    where
1859                        E: serde::de::Error,
1860                    {
1861                        match value {
1862                            "start" => Ok(GeneratedField::Start),
1863                            "commit" => Ok(GeneratedField::Commit),
1864                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1865                        }
1866                    }
1867                }
1868                deserializer.deserialize_identifier(GeneratedVisitor)
1869            }
1870        }
1871        struct GeneratedVisitor;
1872        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1873            type Value = SinkCoordinatorStreamResponse;
1874
1875            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1876                formatter.write_str("struct connector_service.SinkCoordinatorStreamResponse")
1877            }
1878
1879            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkCoordinatorStreamResponse, V::Error>
1880                where
1881                    V: serde::de::MapAccess<'de>,
1882            {
1883                let mut response__ = None;
1884                while let Some(k) = map_.next_key()? {
1885                    match k {
1886                        GeneratedField::Start => {
1887                            if response__.is_some() {
1888                                return Err(serde::de::Error::duplicate_field("start"));
1889                            }
1890                            response__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_coordinator_stream_response::Response::Start)
1891;
1892                        }
1893                        GeneratedField::Commit => {
1894                            if response__.is_some() {
1895                                return Err(serde::de::Error::duplicate_field("commit"));
1896                            }
1897                            response__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_coordinator_stream_response::Response::Commit)
1898;
1899                        }
1900                    }
1901                }
1902                Ok(SinkCoordinatorStreamResponse {
1903                    response: response__,
1904                })
1905            }
1906        }
1907        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamResponse", FIELDS, GeneratedVisitor)
1908    }
1909}
1910impl serde::Serialize for sink_coordinator_stream_response::CommitResponse {
1911    #[allow(deprecated)]
1912    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
1913    where
1914        S: serde::Serializer,
1915    {
1916        use serde::ser::SerializeStruct;
1917        let mut len = 0;
1918        if self.epoch != 0 {
1919            len += 1;
1920        }
1921        let mut struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamResponse.CommitResponse", len)?;
1922        if self.epoch != 0 {
1923            #[allow(clippy::needless_borrow)]
1924            #[allow(clippy::needless_borrows_for_generic_args)]
1925            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
1926        }
1927        struct_ser.end()
1928    }
1929}
1930impl<'de> serde::Deserialize<'de> for sink_coordinator_stream_response::CommitResponse {
1931    #[allow(deprecated)]
1932    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
1933    where
1934        D: serde::Deserializer<'de>,
1935    {
1936        const FIELDS: &[&str] = &[
1937            "epoch",
1938        ];
1939
1940        #[allow(clippy::enum_variant_names)]
1941        enum GeneratedField {
1942            Epoch,
1943        }
1944        impl<'de> serde::Deserialize<'de> for GeneratedField {
1945            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
1946            where
1947                D: serde::Deserializer<'de>,
1948            {
1949                struct GeneratedVisitor;
1950
1951                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1952                    type Value = GeneratedField;
1953
1954                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1955                        write!(formatter, "expected one of: {:?}", &FIELDS)
1956                    }
1957
1958                    #[allow(unused_variables)]
1959                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
1960                    where
1961                        E: serde::de::Error,
1962                    {
1963                        match value {
1964                            "epoch" => Ok(GeneratedField::Epoch),
1965                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
1966                        }
1967                    }
1968                }
1969                deserializer.deserialize_identifier(GeneratedVisitor)
1970            }
1971        }
1972        struct GeneratedVisitor;
1973        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
1974            type Value = sink_coordinator_stream_response::CommitResponse;
1975
1976            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1977                formatter.write_str("struct connector_service.SinkCoordinatorStreamResponse.CommitResponse")
1978            }
1979
1980            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_coordinator_stream_response::CommitResponse, V::Error>
1981                where
1982                    V: serde::de::MapAccess<'de>,
1983            {
1984                let mut epoch__ = None;
1985                while let Some(k) = map_.next_key()? {
1986                    match k {
1987                        GeneratedField::Epoch => {
1988                            if epoch__.is_some() {
1989                                return Err(serde::de::Error::duplicate_field("epoch"));
1990                            }
1991                            epoch__ = 
1992                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
1993                            ;
1994                        }
1995                    }
1996                }
1997                Ok(sink_coordinator_stream_response::CommitResponse {
1998                    epoch: epoch__.unwrap_or_default(),
1999                })
2000            }
2001        }
2002        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamResponse.CommitResponse", FIELDS, GeneratedVisitor)
2003    }
2004}
2005impl serde::Serialize for sink_coordinator_stream_response::StartResponse {
2006    #[allow(deprecated)]
2007    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2008    where
2009        S: serde::Serializer,
2010    {
2011        use serde::ser::SerializeStruct;
2012        let len = 0;
2013        let struct_ser = serializer.serialize_struct("connector_service.SinkCoordinatorStreamResponse.StartResponse", len)?;
2014        struct_ser.end()
2015    }
2016}
2017impl<'de> serde::Deserialize<'de> for sink_coordinator_stream_response::StartResponse {
2018    #[allow(deprecated)]
2019    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2020    where
2021        D: serde::Deserializer<'de>,
2022    {
2023        const FIELDS: &[&str] = &[
2024        ];
2025
2026        #[allow(clippy::enum_variant_names)]
2027        enum GeneratedField {
2028        }
2029        impl<'de> serde::Deserialize<'de> for GeneratedField {
2030            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2031            where
2032                D: serde::Deserializer<'de>,
2033            {
2034                struct GeneratedVisitor;
2035
2036                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2037                    type Value = GeneratedField;
2038
2039                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2040                        write!(formatter, "expected one of: {:?}", &FIELDS)
2041                    }
2042
2043                    #[allow(unused_variables)]
2044                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2045                    where
2046                        E: serde::de::Error,
2047                    {
2048                            Err(serde::de::Error::unknown_field(value, FIELDS))
2049                    }
2050                }
2051                deserializer.deserialize_identifier(GeneratedVisitor)
2052            }
2053        }
2054        struct GeneratedVisitor;
2055        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2056            type Value = sink_coordinator_stream_response::StartResponse;
2057
2058            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2059                formatter.write_str("struct connector_service.SinkCoordinatorStreamResponse.StartResponse")
2060            }
2061
2062            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_coordinator_stream_response::StartResponse, V::Error>
2063                where
2064                    V: serde::de::MapAccess<'de>,
2065            {
2066                while map_.next_key::<GeneratedField>()?.is_some() {
2067                    let _ = map_.next_value::<serde::de::IgnoredAny>()?;
2068                }
2069                Ok(sink_coordinator_stream_response::StartResponse {
2070                })
2071            }
2072        }
2073        deserializer.deserialize_struct("connector_service.SinkCoordinatorStreamResponse.StartResponse", FIELDS, GeneratedVisitor)
2074    }
2075}
2076impl serde::Serialize for SinkMetadata {
2077    #[allow(deprecated)]
2078    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2079    where
2080        S: serde::Serializer,
2081    {
2082        use serde::ser::SerializeStruct;
2083        let mut len = 0;
2084        if self.metadata.is_some() {
2085            len += 1;
2086        }
2087        let mut struct_ser = serializer.serialize_struct("connector_service.SinkMetadata", len)?;
2088        if let Some(v) = self.metadata.as_ref() {
2089            match v {
2090                sink_metadata::Metadata::Serialized(v) => {
2091                    struct_ser.serialize_field("serialized", v)?;
2092                }
2093            }
2094        }
2095        struct_ser.end()
2096    }
2097}
2098impl<'de> serde::Deserialize<'de> for SinkMetadata {
2099    #[allow(deprecated)]
2100    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2101    where
2102        D: serde::Deserializer<'de>,
2103    {
2104        const FIELDS: &[&str] = &[
2105            "serialized",
2106        ];
2107
2108        #[allow(clippy::enum_variant_names)]
2109        enum GeneratedField {
2110            Serialized,
2111        }
2112        impl<'de> serde::Deserialize<'de> for GeneratedField {
2113            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2114            where
2115                D: serde::Deserializer<'de>,
2116            {
2117                struct GeneratedVisitor;
2118
2119                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2120                    type Value = GeneratedField;
2121
2122                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2123                        write!(formatter, "expected one of: {:?}", &FIELDS)
2124                    }
2125
2126                    #[allow(unused_variables)]
2127                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2128                    where
2129                        E: serde::de::Error,
2130                    {
2131                        match value {
2132                            "serialized" => Ok(GeneratedField::Serialized),
2133                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2134                        }
2135                    }
2136                }
2137                deserializer.deserialize_identifier(GeneratedVisitor)
2138            }
2139        }
2140        struct GeneratedVisitor;
2141        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2142            type Value = SinkMetadata;
2143
2144            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2145                formatter.write_str("struct connector_service.SinkMetadata")
2146            }
2147
2148            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkMetadata, V::Error>
2149                where
2150                    V: serde::de::MapAccess<'de>,
2151            {
2152                let mut metadata__ = None;
2153                while let Some(k) = map_.next_key()? {
2154                    match k {
2155                        GeneratedField::Serialized => {
2156                            if metadata__.is_some() {
2157                                return Err(serde::de::Error::duplicate_field("serialized"));
2158                            }
2159                            metadata__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_metadata::Metadata::Serialized)
2160;
2161                        }
2162                    }
2163                }
2164                Ok(SinkMetadata {
2165                    metadata: metadata__,
2166                })
2167            }
2168        }
2169        deserializer.deserialize_struct("connector_service.SinkMetadata", FIELDS, GeneratedVisitor)
2170    }
2171}
2172impl serde::Serialize for sink_metadata::SerializedMetadata {
2173    #[allow(deprecated)]
2174    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2175    where
2176        S: serde::Serializer,
2177    {
2178        use serde::ser::SerializeStruct;
2179        let mut len = 0;
2180        if !self.metadata.is_empty() {
2181            len += 1;
2182        }
2183        let mut struct_ser = serializer.serialize_struct("connector_service.SinkMetadata.SerializedMetadata", len)?;
2184        if !self.metadata.is_empty() {
2185            #[allow(clippy::needless_borrow)]
2186            #[allow(clippy::needless_borrows_for_generic_args)]
2187            struct_ser.serialize_field("metadata", pbjson::private::base64::encode(&self.metadata).as_str())?;
2188        }
2189        struct_ser.end()
2190    }
2191}
2192impl<'de> serde::Deserialize<'de> for sink_metadata::SerializedMetadata {
2193    #[allow(deprecated)]
2194    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2195    where
2196        D: serde::Deserializer<'de>,
2197    {
2198        const FIELDS: &[&str] = &[
2199            "metadata",
2200        ];
2201
2202        #[allow(clippy::enum_variant_names)]
2203        enum GeneratedField {
2204            Metadata,
2205        }
2206        impl<'de> serde::Deserialize<'de> for GeneratedField {
2207            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2208            where
2209                D: serde::Deserializer<'de>,
2210            {
2211                struct GeneratedVisitor;
2212
2213                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2214                    type Value = GeneratedField;
2215
2216                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2217                        write!(formatter, "expected one of: {:?}", &FIELDS)
2218                    }
2219
2220                    #[allow(unused_variables)]
2221                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2222                    where
2223                        E: serde::de::Error,
2224                    {
2225                        match value {
2226                            "metadata" => Ok(GeneratedField::Metadata),
2227                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2228                        }
2229                    }
2230                }
2231                deserializer.deserialize_identifier(GeneratedVisitor)
2232            }
2233        }
2234        struct GeneratedVisitor;
2235        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2236            type Value = sink_metadata::SerializedMetadata;
2237
2238            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2239                formatter.write_str("struct connector_service.SinkMetadata.SerializedMetadata")
2240            }
2241
2242            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_metadata::SerializedMetadata, V::Error>
2243                where
2244                    V: serde::de::MapAccess<'de>,
2245            {
2246                let mut metadata__ = None;
2247                while let Some(k) = map_.next_key()? {
2248                    match k {
2249                        GeneratedField::Metadata => {
2250                            if metadata__.is_some() {
2251                                return Err(serde::de::Error::duplicate_field("metadata"));
2252                            }
2253                            metadata__ = 
2254                                Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
2255                            ;
2256                        }
2257                    }
2258                }
2259                Ok(sink_metadata::SerializedMetadata {
2260                    metadata: metadata__.unwrap_or_default(),
2261                })
2262            }
2263        }
2264        deserializer.deserialize_struct("connector_service.SinkMetadata.SerializedMetadata", FIELDS, GeneratedVisitor)
2265    }
2266}
2267impl serde::Serialize for SinkParam {
2268    #[allow(deprecated)]
2269    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2270    where
2271        S: serde::Serializer,
2272    {
2273        use serde::ser::SerializeStruct;
2274        let mut len = 0;
2275        if self.sink_id != 0 {
2276            len += 1;
2277        }
2278        if !self.properties.is_empty() {
2279            len += 1;
2280        }
2281        if self.table_schema.is_some() {
2282            len += 1;
2283        }
2284        if self.sink_type != 0 {
2285            len += 1;
2286        }
2287        if !self.db_name.is_empty() {
2288            len += 1;
2289        }
2290        if !self.sink_from_name.is_empty() {
2291            len += 1;
2292        }
2293        if self.format_desc.is_some() {
2294            len += 1;
2295        }
2296        if !self.sink_name.is_empty() {
2297            len += 1;
2298        }
2299        let mut struct_ser = serializer.serialize_struct("connector_service.SinkParam", len)?;
2300        if self.sink_id != 0 {
2301            struct_ser.serialize_field("sinkId", &self.sink_id)?;
2302        }
2303        if !self.properties.is_empty() {
2304            struct_ser.serialize_field("properties", &self.properties)?;
2305        }
2306        if let Some(v) = self.table_schema.as_ref() {
2307            struct_ser.serialize_field("tableSchema", v)?;
2308        }
2309        if self.sink_type != 0 {
2310            let v = super::catalog::SinkType::try_from(self.sink_type)
2311                .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.sink_type)))?;
2312            struct_ser.serialize_field("sinkType", &v)?;
2313        }
2314        if !self.db_name.is_empty() {
2315            struct_ser.serialize_field("dbName", &self.db_name)?;
2316        }
2317        if !self.sink_from_name.is_empty() {
2318            struct_ser.serialize_field("sinkFromName", &self.sink_from_name)?;
2319        }
2320        if let Some(v) = self.format_desc.as_ref() {
2321            struct_ser.serialize_field("formatDesc", v)?;
2322        }
2323        if !self.sink_name.is_empty() {
2324            struct_ser.serialize_field("sinkName", &self.sink_name)?;
2325        }
2326        struct_ser.end()
2327    }
2328}
2329impl<'de> serde::Deserialize<'de> for SinkParam {
2330    #[allow(deprecated)]
2331    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2332    where
2333        D: serde::Deserializer<'de>,
2334    {
2335        const FIELDS: &[&str] = &[
2336            "sink_id",
2337            "sinkId",
2338            "properties",
2339            "table_schema",
2340            "tableSchema",
2341            "sink_type",
2342            "sinkType",
2343            "db_name",
2344            "dbName",
2345            "sink_from_name",
2346            "sinkFromName",
2347            "format_desc",
2348            "formatDesc",
2349            "sink_name",
2350            "sinkName",
2351        ];
2352
2353        #[allow(clippy::enum_variant_names)]
2354        enum GeneratedField {
2355            SinkId,
2356            Properties,
2357            TableSchema,
2358            SinkType,
2359            DbName,
2360            SinkFromName,
2361            FormatDesc,
2362            SinkName,
2363        }
2364        impl<'de> serde::Deserialize<'de> for GeneratedField {
2365            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2366            where
2367                D: serde::Deserializer<'de>,
2368            {
2369                struct GeneratedVisitor;
2370
2371                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2372                    type Value = GeneratedField;
2373
2374                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2375                        write!(formatter, "expected one of: {:?}", &FIELDS)
2376                    }
2377
2378                    #[allow(unused_variables)]
2379                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2380                    where
2381                        E: serde::de::Error,
2382                    {
2383                        match value {
2384                            "sinkId" | "sink_id" => Ok(GeneratedField::SinkId),
2385                            "properties" => Ok(GeneratedField::Properties),
2386                            "tableSchema" | "table_schema" => Ok(GeneratedField::TableSchema),
2387                            "sinkType" | "sink_type" => Ok(GeneratedField::SinkType),
2388                            "dbName" | "db_name" => Ok(GeneratedField::DbName),
2389                            "sinkFromName" | "sink_from_name" => Ok(GeneratedField::SinkFromName),
2390                            "formatDesc" | "format_desc" => Ok(GeneratedField::FormatDesc),
2391                            "sinkName" | "sink_name" => Ok(GeneratedField::SinkName),
2392                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2393                        }
2394                    }
2395                }
2396                deserializer.deserialize_identifier(GeneratedVisitor)
2397            }
2398        }
2399        struct GeneratedVisitor;
2400        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2401            type Value = SinkParam;
2402
2403            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2404                formatter.write_str("struct connector_service.SinkParam")
2405            }
2406
2407            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkParam, V::Error>
2408                where
2409                    V: serde::de::MapAccess<'de>,
2410            {
2411                let mut sink_id__ = None;
2412                let mut properties__ = None;
2413                let mut table_schema__ = None;
2414                let mut sink_type__ = None;
2415                let mut db_name__ = None;
2416                let mut sink_from_name__ = None;
2417                let mut format_desc__ = None;
2418                let mut sink_name__ = None;
2419                while let Some(k) = map_.next_key()? {
2420                    match k {
2421                        GeneratedField::SinkId => {
2422                            if sink_id__.is_some() {
2423                                return Err(serde::de::Error::duplicate_field("sinkId"));
2424                            }
2425                            sink_id__ = 
2426                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
2427                            ;
2428                        }
2429                        GeneratedField::Properties => {
2430                            if properties__.is_some() {
2431                                return Err(serde::de::Error::duplicate_field("properties"));
2432                            }
2433                            properties__ = Some(
2434                                map_.next_value::<std::collections::BTreeMap<_, _>>()?
2435                            );
2436                        }
2437                        GeneratedField::TableSchema => {
2438                            if table_schema__.is_some() {
2439                                return Err(serde::de::Error::duplicate_field("tableSchema"));
2440                            }
2441                            table_schema__ = map_.next_value()?;
2442                        }
2443                        GeneratedField::SinkType => {
2444                            if sink_type__.is_some() {
2445                                return Err(serde::de::Error::duplicate_field("sinkType"));
2446                            }
2447                            sink_type__ = Some(map_.next_value::<super::catalog::SinkType>()? as i32);
2448                        }
2449                        GeneratedField::DbName => {
2450                            if db_name__.is_some() {
2451                                return Err(serde::de::Error::duplicate_field("dbName"));
2452                            }
2453                            db_name__ = Some(map_.next_value()?);
2454                        }
2455                        GeneratedField::SinkFromName => {
2456                            if sink_from_name__.is_some() {
2457                                return Err(serde::de::Error::duplicate_field("sinkFromName"));
2458                            }
2459                            sink_from_name__ = Some(map_.next_value()?);
2460                        }
2461                        GeneratedField::FormatDesc => {
2462                            if format_desc__.is_some() {
2463                                return Err(serde::de::Error::duplicate_field("formatDesc"));
2464                            }
2465                            format_desc__ = map_.next_value()?;
2466                        }
2467                        GeneratedField::SinkName => {
2468                            if sink_name__.is_some() {
2469                                return Err(serde::de::Error::duplicate_field("sinkName"));
2470                            }
2471                            sink_name__ = Some(map_.next_value()?);
2472                        }
2473                    }
2474                }
2475                Ok(SinkParam {
2476                    sink_id: sink_id__.unwrap_or_default(),
2477                    properties: properties__.unwrap_or_default(),
2478                    table_schema: table_schema__,
2479                    sink_type: sink_type__.unwrap_or_default(),
2480                    db_name: db_name__.unwrap_or_default(),
2481                    sink_from_name: sink_from_name__.unwrap_or_default(),
2482                    format_desc: format_desc__,
2483                    sink_name: sink_name__.unwrap_or_default(),
2484                })
2485            }
2486        }
2487        deserializer.deserialize_struct("connector_service.SinkParam", FIELDS, GeneratedVisitor)
2488    }
2489}
2490impl serde::Serialize for SinkWriterStreamRequest {
2491    #[allow(deprecated)]
2492    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2493    where
2494        S: serde::Serializer,
2495    {
2496        use serde::ser::SerializeStruct;
2497        let mut len = 0;
2498        if self.request.is_some() {
2499            len += 1;
2500        }
2501        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamRequest", len)?;
2502        if let Some(v) = self.request.as_ref() {
2503            match v {
2504                sink_writer_stream_request::Request::Start(v) => {
2505                    struct_ser.serialize_field("start", v)?;
2506                }
2507                sink_writer_stream_request::Request::WriteBatch(v) => {
2508                    struct_ser.serialize_field("writeBatch", v)?;
2509                }
2510                sink_writer_stream_request::Request::Barrier(v) => {
2511                    struct_ser.serialize_field("barrier", v)?;
2512                }
2513            }
2514        }
2515        struct_ser.end()
2516    }
2517}
2518impl<'de> serde::Deserialize<'de> for SinkWriterStreamRequest {
2519    #[allow(deprecated)]
2520    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2521    where
2522        D: serde::Deserializer<'de>,
2523    {
2524        const FIELDS: &[&str] = &[
2525            "start",
2526            "write_batch",
2527            "writeBatch",
2528            "barrier",
2529        ];
2530
2531        #[allow(clippy::enum_variant_names)]
2532        enum GeneratedField {
2533            Start,
2534            WriteBatch,
2535            Barrier,
2536        }
2537        impl<'de> serde::Deserialize<'de> for GeneratedField {
2538            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2539            where
2540                D: serde::Deserializer<'de>,
2541            {
2542                struct GeneratedVisitor;
2543
2544                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2545                    type Value = GeneratedField;
2546
2547                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2548                        write!(formatter, "expected one of: {:?}", &FIELDS)
2549                    }
2550
2551                    #[allow(unused_variables)]
2552                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2553                    where
2554                        E: serde::de::Error,
2555                    {
2556                        match value {
2557                            "start" => Ok(GeneratedField::Start),
2558                            "writeBatch" | "write_batch" => Ok(GeneratedField::WriteBatch),
2559                            "barrier" => Ok(GeneratedField::Barrier),
2560                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2561                        }
2562                    }
2563                }
2564                deserializer.deserialize_identifier(GeneratedVisitor)
2565            }
2566        }
2567        struct GeneratedVisitor;
2568        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2569            type Value = SinkWriterStreamRequest;
2570
2571            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2572                formatter.write_str("struct connector_service.SinkWriterStreamRequest")
2573            }
2574
2575            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkWriterStreamRequest, V::Error>
2576                where
2577                    V: serde::de::MapAccess<'de>,
2578            {
2579                let mut request__ = None;
2580                while let Some(k) = map_.next_key()? {
2581                    match k {
2582                        GeneratedField::Start => {
2583                            if request__.is_some() {
2584                                return Err(serde::de::Error::duplicate_field("start"));
2585                            }
2586                            request__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_request::Request::Start)
2587;
2588                        }
2589                        GeneratedField::WriteBatch => {
2590                            if request__.is_some() {
2591                                return Err(serde::de::Error::duplicate_field("writeBatch"));
2592                            }
2593                            request__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_request::Request::WriteBatch)
2594;
2595                        }
2596                        GeneratedField::Barrier => {
2597                            if request__.is_some() {
2598                                return Err(serde::de::Error::duplicate_field("barrier"));
2599                            }
2600                            request__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_request::Request::Barrier)
2601;
2602                        }
2603                    }
2604                }
2605                Ok(SinkWriterStreamRequest {
2606                    request: request__,
2607                })
2608            }
2609        }
2610        deserializer.deserialize_struct("connector_service.SinkWriterStreamRequest", FIELDS, GeneratedVisitor)
2611    }
2612}
2613impl serde::Serialize for sink_writer_stream_request::Barrier {
2614    #[allow(deprecated)]
2615    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2616    where
2617        S: serde::Serializer,
2618    {
2619        use serde::ser::SerializeStruct;
2620        let mut len = 0;
2621        if self.epoch != 0 {
2622            len += 1;
2623        }
2624        if self.is_checkpoint {
2625            len += 1;
2626        }
2627        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamRequest.Barrier", len)?;
2628        if self.epoch != 0 {
2629            #[allow(clippy::needless_borrow)]
2630            #[allow(clippy::needless_borrows_for_generic_args)]
2631            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
2632        }
2633        if self.is_checkpoint {
2634            struct_ser.serialize_field("isCheckpoint", &self.is_checkpoint)?;
2635        }
2636        struct_ser.end()
2637    }
2638}
2639impl<'de> serde::Deserialize<'de> for sink_writer_stream_request::Barrier {
2640    #[allow(deprecated)]
2641    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2642    where
2643        D: serde::Deserializer<'de>,
2644    {
2645        const FIELDS: &[&str] = &[
2646            "epoch",
2647            "is_checkpoint",
2648            "isCheckpoint",
2649        ];
2650
2651        #[allow(clippy::enum_variant_names)]
2652        enum GeneratedField {
2653            Epoch,
2654            IsCheckpoint,
2655        }
2656        impl<'de> serde::Deserialize<'de> for GeneratedField {
2657            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2658            where
2659                D: serde::Deserializer<'de>,
2660            {
2661                struct GeneratedVisitor;
2662
2663                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2664                    type Value = GeneratedField;
2665
2666                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2667                        write!(formatter, "expected one of: {:?}", &FIELDS)
2668                    }
2669
2670                    #[allow(unused_variables)]
2671                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2672                    where
2673                        E: serde::de::Error,
2674                    {
2675                        match value {
2676                            "epoch" => Ok(GeneratedField::Epoch),
2677                            "isCheckpoint" | "is_checkpoint" => Ok(GeneratedField::IsCheckpoint),
2678                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2679                        }
2680                    }
2681                }
2682                deserializer.deserialize_identifier(GeneratedVisitor)
2683            }
2684        }
2685        struct GeneratedVisitor;
2686        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2687            type Value = sink_writer_stream_request::Barrier;
2688
2689            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2690                formatter.write_str("struct connector_service.SinkWriterStreamRequest.Barrier")
2691            }
2692
2693            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_request::Barrier, V::Error>
2694                where
2695                    V: serde::de::MapAccess<'de>,
2696            {
2697                let mut epoch__ = None;
2698                let mut is_checkpoint__ = None;
2699                while let Some(k) = map_.next_key()? {
2700                    match k {
2701                        GeneratedField::Epoch => {
2702                            if epoch__.is_some() {
2703                                return Err(serde::de::Error::duplicate_field("epoch"));
2704                            }
2705                            epoch__ = 
2706                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
2707                            ;
2708                        }
2709                        GeneratedField::IsCheckpoint => {
2710                            if is_checkpoint__.is_some() {
2711                                return Err(serde::de::Error::duplicate_field("isCheckpoint"));
2712                            }
2713                            is_checkpoint__ = Some(map_.next_value()?);
2714                        }
2715                    }
2716                }
2717                Ok(sink_writer_stream_request::Barrier {
2718                    epoch: epoch__.unwrap_or_default(),
2719                    is_checkpoint: is_checkpoint__.unwrap_or_default(),
2720                })
2721            }
2722        }
2723        deserializer.deserialize_struct("connector_service.SinkWriterStreamRequest.Barrier", FIELDS, GeneratedVisitor)
2724    }
2725}
2726impl serde::Serialize for sink_writer_stream_request::StartSink {
2727    #[allow(deprecated)]
2728    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2729    where
2730        S: serde::Serializer,
2731    {
2732        use serde::ser::SerializeStruct;
2733        let mut len = 0;
2734        if self.sink_param.is_some() {
2735            len += 1;
2736        }
2737        if self.payload_schema.is_some() {
2738            len += 1;
2739        }
2740        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamRequest.StartSink", len)?;
2741        if let Some(v) = self.sink_param.as_ref() {
2742            struct_ser.serialize_field("sinkParam", v)?;
2743        }
2744        if let Some(v) = self.payload_schema.as_ref() {
2745            struct_ser.serialize_field("payloadSchema", v)?;
2746        }
2747        struct_ser.end()
2748    }
2749}
2750impl<'de> serde::Deserialize<'de> for sink_writer_stream_request::StartSink {
2751    #[allow(deprecated)]
2752    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2753    where
2754        D: serde::Deserializer<'de>,
2755    {
2756        const FIELDS: &[&str] = &[
2757            "sink_param",
2758            "sinkParam",
2759            "payload_schema",
2760            "payloadSchema",
2761        ];
2762
2763        #[allow(clippy::enum_variant_names)]
2764        enum GeneratedField {
2765            SinkParam,
2766            PayloadSchema,
2767        }
2768        impl<'de> serde::Deserialize<'de> for GeneratedField {
2769            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2770            where
2771                D: serde::Deserializer<'de>,
2772            {
2773                struct GeneratedVisitor;
2774
2775                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2776                    type Value = GeneratedField;
2777
2778                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2779                        write!(formatter, "expected one of: {:?}", &FIELDS)
2780                    }
2781
2782                    #[allow(unused_variables)]
2783                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2784                    where
2785                        E: serde::de::Error,
2786                    {
2787                        match value {
2788                            "sinkParam" | "sink_param" => Ok(GeneratedField::SinkParam),
2789                            "payloadSchema" | "payload_schema" => Ok(GeneratedField::PayloadSchema),
2790                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2791                        }
2792                    }
2793                }
2794                deserializer.deserialize_identifier(GeneratedVisitor)
2795            }
2796        }
2797        struct GeneratedVisitor;
2798        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2799            type Value = sink_writer_stream_request::StartSink;
2800
2801            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2802                formatter.write_str("struct connector_service.SinkWriterStreamRequest.StartSink")
2803            }
2804
2805            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_request::StartSink, V::Error>
2806                where
2807                    V: serde::de::MapAccess<'de>,
2808            {
2809                let mut sink_param__ = None;
2810                let mut payload_schema__ = None;
2811                while let Some(k) = map_.next_key()? {
2812                    match k {
2813                        GeneratedField::SinkParam => {
2814                            if sink_param__.is_some() {
2815                                return Err(serde::de::Error::duplicate_field("sinkParam"));
2816                            }
2817                            sink_param__ = map_.next_value()?;
2818                        }
2819                        GeneratedField::PayloadSchema => {
2820                            if payload_schema__.is_some() {
2821                                return Err(serde::de::Error::duplicate_field("payloadSchema"));
2822                            }
2823                            payload_schema__ = map_.next_value()?;
2824                        }
2825                    }
2826                }
2827                Ok(sink_writer_stream_request::StartSink {
2828                    sink_param: sink_param__,
2829                    payload_schema: payload_schema__,
2830                })
2831            }
2832        }
2833        deserializer.deserialize_struct("connector_service.SinkWriterStreamRequest.StartSink", FIELDS, GeneratedVisitor)
2834    }
2835}
2836impl serde::Serialize for sink_writer_stream_request::WriteBatch {
2837    #[allow(deprecated)]
2838    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2839    where
2840        S: serde::Serializer,
2841    {
2842        use serde::ser::SerializeStruct;
2843        let mut len = 0;
2844        if self.batch_id != 0 {
2845            len += 1;
2846        }
2847        if self.epoch != 0 {
2848            len += 1;
2849        }
2850        if self.payload.is_some() {
2851            len += 1;
2852        }
2853        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamRequest.WriteBatch", len)?;
2854        if self.batch_id != 0 {
2855            #[allow(clippy::needless_borrow)]
2856            #[allow(clippy::needless_borrows_for_generic_args)]
2857            struct_ser.serialize_field("batchId", ToString::to_string(&self.batch_id).as_str())?;
2858        }
2859        if self.epoch != 0 {
2860            #[allow(clippy::needless_borrow)]
2861            #[allow(clippy::needless_borrows_for_generic_args)]
2862            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
2863        }
2864        if let Some(v) = self.payload.as_ref() {
2865            match v {
2866                sink_writer_stream_request::write_batch::Payload::StreamChunkPayload(v) => {
2867                    struct_ser.serialize_field("streamChunkPayload", v)?;
2868                }
2869                sink_writer_stream_request::write_batch::Payload::StreamChunkRefPointer(v) => {
2870                    #[allow(clippy::needless_borrow)]
2871                    #[allow(clippy::needless_borrows_for_generic_args)]
2872                    struct_ser.serialize_field("streamChunkRefPointer", ToString::to_string(&v).as_str())?;
2873                }
2874            }
2875        }
2876        struct_ser.end()
2877    }
2878}
2879impl<'de> serde::Deserialize<'de> for sink_writer_stream_request::WriteBatch {
2880    #[allow(deprecated)]
2881    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
2882    where
2883        D: serde::Deserializer<'de>,
2884    {
2885        const FIELDS: &[&str] = &[
2886            "batch_id",
2887            "batchId",
2888            "epoch",
2889            "stream_chunk_payload",
2890            "streamChunkPayload",
2891            "stream_chunk_ref_pointer",
2892            "streamChunkRefPointer",
2893        ];
2894
2895        #[allow(clippy::enum_variant_names)]
2896        enum GeneratedField {
2897            BatchId,
2898            Epoch,
2899            StreamChunkPayload,
2900            StreamChunkRefPointer,
2901        }
2902        impl<'de> serde::Deserialize<'de> for GeneratedField {
2903            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
2904            where
2905                D: serde::Deserializer<'de>,
2906            {
2907                struct GeneratedVisitor;
2908
2909                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2910                    type Value = GeneratedField;
2911
2912                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2913                        write!(formatter, "expected one of: {:?}", &FIELDS)
2914                    }
2915
2916                    #[allow(unused_variables)]
2917                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
2918                    where
2919                        E: serde::de::Error,
2920                    {
2921                        match value {
2922                            "batchId" | "batch_id" => Ok(GeneratedField::BatchId),
2923                            "epoch" => Ok(GeneratedField::Epoch),
2924                            "streamChunkPayload" | "stream_chunk_payload" => Ok(GeneratedField::StreamChunkPayload),
2925                            "streamChunkRefPointer" | "stream_chunk_ref_pointer" => Ok(GeneratedField::StreamChunkRefPointer),
2926                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
2927                        }
2928                    }
2929                }
2930                deserializer.deserialize_identifier(GeneratedVisitor)
2931            }
2932        }
2933        struct GeneratedVisitor;
2934        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
2935            type Value = sink_writer_stream_request::WriteBatch;
2936
2937            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2938                formatter.write_str("struct connector_service.SinkWriterStreamRequest.WriteBatch")
2939            }
2940
2941            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_request::WriteBatch, V::Error>
2942                where
2943                    V: serde::de::MapAccess<'de>,
2944            {
2945                let mut batch_id__ = None;
2946                let mut epoch__ = None;
2947                let mut payload__ = None;
2948                while let Some(k) = map_.next_key()? {
2949                    match k {
2950                        GeneratedField::BatchId => {
2951                            if batch_id__.is_some() {
2952                                return Err(serde::de::Error::duplicate_field("batchId"));
2953                            }
2954                            batch_id__ = 
2955                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
2956                            ;
2957                        }
2958                        GeneratedField::Epoch => {
2959                            if epoch__.is_some() {
2960                                return Err(serde::de::Error::duplicate_field("epoch"));
2961                            }
2962                            epoch__ = 
2963                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
2964                            ;
2965                        }
2966                        GeneratedField::StreamChunkPayload => {
2967                            if payload__.is_some() {
2968                                return Err(serde::de::Error::duplicate_field("streamChunkPayload"));
2969                            }
2970                            payload__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_request::write_batch::Payload::StreamChunkPayload)
2971;
2972                        }
2973                        GeneratedField::StreamChunkRefPointer => {
2974                            if payload__.is_some() {
2975                                return Err(serde::de::Error::duplicate_field("streamChunkRefPointer"));
2976                            }
2977                            payload__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| sink_writer_stream_request::write_batch::Payload::StreamChunkRefPointer(x.0));
2978                        }
2979                    }
2980                }
2981                Ok(sink_writer_stream_request::WriteBatch {
2982                    batch_id: batch_id__.unwrap_or_default(),
2983                    epoch: epoch__.unwrap_or_default(),
2984                    payload: payload__,
2985                })
2986            }
2987        }
2988        deserializer.deserialize_struct("connector_service.SinkWriterStreamRequest.WriteBatch", FIELDS, GeneratedVisitor)
2989    }
2990}
2991impl serde::Serialize for sink_writer_stream_request::write_batch::StreamChunkPayload {
2992    #[allow(deprecated)]
2993    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
2994    where
2995        S: serde::Serializer,
2996    {
2997        use serde::ser::SerializeStruct;
2998        let mut len = 0;
2999        if !self.binary_data.is_empty() {
3000            len += 1;
3001        }
3002        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamRequest.WriteBatch.StreamChunkPayload", len)?;
3003        if !self.binary_data.is_empty() {
3004            #[allow(clippy::needless_borrow)]
3005            #[allow(clippy::needless_borrows_for_generic_args)]
3006            struct_ser.serialize_field("binaryData", pbjson::private::base64::encode(&self.binary_data).as_str())?;
3007        }
3008        struct_ser.end()
3009    }
3010}
3011impl<'de> serde::Deserialize<'de> for sink_writer_stream_request::write_batch::StreamChunkPayload {
3012    #[allow(deprecated)]
3013    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3014    where
3015        D: serde::Deserializer<'de>,
3016    {
3017        const FIELDS: &[&str] = &[
3018            "binary_data",
3019            "binaryData",
3020        ];
3021
3022        #[allow(clippy::enum_variant_names)]
3023        enum GeneratedField {
3024            BinaryData,
3025        }
3026        impl<'de> serde::Deserialize<'de> for GeneratedField {
3027            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3028            where
3029                D: serde::Deserializer<'de>,
3030            {
3031                struct GeneratedVisitor;
3032
3033                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3034                    type Value = GeneratedField;
3035
3036                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3037                        write!(formatter, "expected one of: {:?}", &FIELDS)
3038                    }
3039
3040                    #[allow(unused_variables)]
3041                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3042                    where
3043                        E: serde::de::Error,
3044                    {
3045                        match value {
3046                            "binaryData" | "binary_data" => Ok(GeneratedField::BinaryData),
3047                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3048                        }
3049                    }
3050                }
3051                deserializer.deserialize_identifier(GeneratedVisitor)
3052            }
3053        }
3054        struct GeneratedVisitor;
3055        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3056            type Value = sink_writer_stream_request::write_batch::StreamChunkPayload;
3057
3058            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3059                formatter.write_str("struct connector_service.SinkWriterStreamRequest.WriteBatch.StreamChunkPayload")
3060            }
3061
3062            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_request::write_batch::StreamChunkPayload, V::Error>
3063                where
3064                    V: serde::de::MapAccess<'de>,
3065            {
3066                let mut binary_data__ = None;
3067                while let Some(k) = map_.next_key()? {
3068                    match k {
3069                        GeneratedField::BinaryData => {
3070                            if binary_data__.is_some() {
3071                                return Err(serde::de::Error::duplicate_field("binaryData"));
3072                            }
3073                            binary_data__ = 
3074                                Some(map_.next_value::<::pbjson::private::BytesDeserialize<_>>()?.0)
3075                            ;
3076                        }
3077                    }
3078                }
3079                Ok(sink_writer_stream_request::write_batch::StreamChunkPayload {
3080                    binary_data: binary_data__.unwrap_or_default(),
3081                })
3082            }
3083        }
3084        deserializer.deserialize_struct("connector_service.SinkWriterStreamRequest.WriteBatch.StreamChunkPayload", FIELDS, GeneratedVisitor)
3085    }
3086}
3087impl serde::Serialize for SinkWriterStreamResponse {
3088    #[allow(deprecated)]
3089    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3090    where
3091        S: serde::Serializer,
3092    {
3093        use serde::ser::SerializeStruct;
3094        let mut len = 0;
3095        if self.response.is_some() {
3096            len += 1;
3097        }
3098        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamResponse", len)?;
3099        if let Some(v) = self.response.as_ref() {
3100            match v {
3101                sink_writer_stream_response::Response::Start(v) => {
3102                    struct_ser.serialize_field("start", v)?;
3103                }
3104                sink_writer_stream_response::Response::Commit(v) => {
3105                    struct_ser.serialize_field("commit", v)?;
3106                }
3107                sink_writer_stream_response::Response::Batch(v) => {
3108                    struct_ser.serialize_field("batch", v)?;
3109                }
3110            }
3111        }
3112        struct_ser.end()
3113    }
3114}
3115impl<'de> serde::Deserialize<'de> for SinkWriterStreamResponse {
3116    #[allow(deprecated)]
3117    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3118    where
3119        D: serde::Deserializer<'de>,
3120    {
3121        const FIELDS: &[&str] = &[
3122            "start",
3123            "commit",
3124            "batch",
3125        ];
3126
3127        #[allow(clippy::enum_variant_names)]
3128        enum GeneratedField {
3129            Start,
3130            Commit,
3131            Batch,
3132        }
3133        impl<'de> serde::Deserialize<'de> for GeneratedField {
3134            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3135            where
3136                D: serde::Deserializer<'de>,
3137            {
3138                struct GeneratedVisitor;
3139
3140                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3141                    type Value = GeneratedField;
3142
3143                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3144                        write!(formatter, "expected one of: {:?}", &FIELDS)
3145                    }
3146
3147                    #[allow(unused_variables)]
3148                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3149                    where
3150                        E: serde::de::Error,
3151                    {
3152                        match value {
3153                            "start" => Ok(GeneratedField::Start),
3154                            "commit" => Ok(GeneratedField::Commit),
3155                            "batch" => Ok(GeneratedField::Batch),
3156                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3157                        }
3158                    }
3159                }
3160                deserializer.deserialize_identifier(GeneratedVisitor)
3161            }
3162        }
3163        struct GeneratedVisitor;
3164        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3165            type Value = SinkWriterStreamResponse;
3166
3167            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3168                formatter.write_str("struct connector_service.SinkWriterStreamResponse")
3169            }
3170
3171            fn visit_map<V>(self, mut map_: V) -> std::result::Result<SinkWriterStreamResponse, V::Error>
3172                where
3173                    V: serde::de::MapAccess<'de>,
3174            {
3175                let mut response__ = None;
3176                while let Some(k) = map_.next_key()? {
3177                    match k {
3178                        GeneratedField::Start => {
3179                            if response__.is_some() {
3180                                return Err(serde::de::Error::duplicate_field("start"));
3181                            }
3182                            response__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_response::Response::Start)
3183;
3184                        }
3185                        GeneratedField::Commit => {
3186                            if response__.is_some() {
3187                                return Err(serde::de::Error::duplicate_field("commit"));
3188                            }
3189                            response__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_response::Response::Commit)
3190;
3191                        }
3192                        GeneratedField::Batch => {
3193                            if response__.is_some() {
3194                                return Err(serde::de::Error::duplicate_field("batch"));
3195                            }
3196                            response__ = map_.next_value::<::std::option::Option<_>>()?.map(sink_writer_stream_response::Response::Batch)
3197;
3198                        }
3199                    }
3200                }
3201                Ok(SinkWriterStreamResponse {
3202                    response: response__,
3203                })
3204            }
3205        }
3206        deserializer.deserialize_struct("connector_service.SinkWriterStreamResponse", FIELDS, GeneratedVisitor)
3207    }
3208}
3209impl serde::Serialize for sink_writer_stream_response::BatchWrittenResponse {
3210    #[allow(deprecated)]
3211    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3212    where
3213        S: serde::Serializer,
3214    {
3215        use serde::ser::SerializeStruct;
3216        let mut len = 0;
3217        if self.epoch != 0 {
3218            len += 1;
3219        }
3220        if self.batch_id != 0 {
3221            len += 1;
3222        }
3223        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamResponse.BatchWrittenResponse", len)?;
3224        if self.epoch != 0 {
3225            #[allow(clippy::needless_borrow)]
3226            #[allow(clippy::needless_borrows_for_generic_args)]
3227            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
3228        }
3229        if self.batch_id != 0 {
3230            #[allow(clippy::needless_borrow)]
3231            #[allow(clippy::needless_borrows_for_generic_args)]
3232            struct_ser.serialize_field("batchId", ToString::to_string(&self.batch_id).as_str())?;
3233        }
3234        struct_ser.end()
3235    }
3236}
3237impl<'de> serde::Deserialize<'de> for sink_writer_stream_response::BatchWrittenResponse {
3238    #[allow(deprecated)]
3239    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3240    where
3241        D: serde::Deserializer<'de>,
3242    {
3243        const FIELDS: &[&str] = &[
3244            "epoch",
3245            "batch_id",
3246            "batchId",
3247        ];
3248
3249        #[allow(clippy::enum_variant_names)]
3250        enum GeneratedField {
3251            Epoch,
3252            BatchId,
3253        }
3254        impl<'de> serde::Deserialize<'de> for GeneratedField {
3255            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3256            where
3257                D: serde::Deserializer<'de>,
3258            {
3259                struct GeneratedVisitor;
3260
3261                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3262                    type Value = GeneratedField;
3263
3264                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3265                        write!(formatter, "expected one of: {:?}", &FIELDS)
3266                    }
3267
3268                    #[allow(unused_variables)]
3269                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3270                    where
3271                        E: serde::de::Error,
3272                    {
3273                        match value {
3274                            "epoch" => Ok(GeneratedField::Epoch),
3275                            "batchId" | "batch_id" => Ok(GeneratedField::BatchId),
3276                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3277                        }
3278                    }
3279                }
3280                deserializer.deserialize_identifier(GeneratedVisitor)
3281            }
3282        }
3283        struct GeneratedVisitor;
3284        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3285            type Value = sink_writer_stream_response::BatchWrittenResponse;
3286
3287            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3288                formatter.write_str("struct connector_service.SinkWriterStreamResponse.BatchWrittenResponse")
3289            }
3290
3291            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_response::BatchWrittenResponse, V::Error>
3292                where
3293                    V: serde::de::MapAccess<'de>,
3294            {
3295                let mut epoch__ = None;
3296                let mut batch_id__ = None;
3297                while let Some(k) = map_.next_key()? {
3298                    match k {
3299                        GeneratedField::Epoch => {
3300                            if epoch__.is_some() {
3301                                return Err(serde::de::Error::duplicate_field("epoch"));
3302                            }
3303                            epoch__ = 
3304                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
3305                            ;
3306                        }
3307                        GeneratedField::BatchId => {
3308                            if batch_id__.is_some() {
3309                                return Err(serde::de::Error::duplicate_field("batchId"));
3310                            }
3311                            batch_id__ = 
3312                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
3313                            ;
3314                        }
3315                    }
3316                }
3317                Ok(sink_writer_stream_response::BatchWrittenResponse {
3318                    epoch: epoch__.unwrap_or_default(),
3319                    batch_id: batch_id__.unwrap_or_default(),
3320                })
3321            }
3322        }
3323        deserializer.deserialize_struct("connector_service.SinkWriterStreamResponse.BatchWrittenResponse", FIELDS, GeneratedVisitor)
3324    }
3325}
3326impl serde::Serialize for sink_writer_stream_response::CommitResponse {
3327    #[allow(deprecated)]
3328    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3329    where
3330        S: serde::Serializer,
3331    {
3332        use serde::ser::SerializeStruct;
3333        let mut len = 0;
3334        if self.epoch != 0 {
3335            len += 1;
3336        }
3337        if self.metadata.is_some() {
3338            len += 1;
3339        }
3340        let mut struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamResponse.CommitResponse", len)?;
3341        if self.epoch != 0 {
3342            #[allow(clippy::needless_borrow)]
3343            #[allow(clippy::needless_borrows_for_generic_args)]
3344            struct_ser.serialize_field("epoch", ToString::to_string(&self.epoch).as_str())?;
3345        }
3346        if let Some(v) = self.metadata.as_ref() {
3347            struct_ser.serialize_field("metadata", v)?;
3348        }
3349        struct_ser.end()
3350    }
3351}
3352impl<'de> serde::Deserialize<'de> for sink_writer_stream_response::CommitResponse {
3353    #[allow(deprecated)]
3354    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3355    where
3356        D: serde::Deserializer<'de>,
3357    {
3358        const FIELDS: &[&str] = &[
3359            "epoch",
3360            "metadata",
3361        ];
3362
3363        #[allow(clippy::enum_variant_names)]
3364        enum GeneratedField {
3365            Epoch,
3366            Metadata,
3367        }
3368        impl<'de> serde::Deserialize<'de> for GeneratedField {
3369            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3370            where
3371                D: serde::Deserializer<'de>,
3372            {
3373                struct GeneratedVisitor;
3374
3375                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3376                    type Value = GeneratedField;
3377
3378                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3379                        write!(formatter, "expected one of: {:?}", &FIELDS)
3380                    }
3381
3382                    #[allow(unused_variables)]
3383                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3384                    where
3385                        E: serde::de::Error,
3386                    {
3387                        match value {
3388                            "epoch" => Ok(GeneratedField::Epoch),
3389                            "metadata" => Ok(GeneratedField::Metadata),
3390                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3391                        }
3392                    }
3393                }
3394                deserializer.deserialize_identifier(GeneratedVisitor)
3395            }
3396        }
3397        struct GeneratedVisitor;
3398        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3399            type Value = sink_writer_stream_response::CommitResponse;
3400
3401            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3402                formatter.write_str("struct connector_service.SinkWriterStreamResponse.CommitResponse")
3403            }
3404
3405            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_response::CommitResponse, V::Error>
3406                where
3407                    V: serde::de::MapAccess<'de>,
3408            {
3409                let mut epoch__ = None;
3410                let mut metadata__ = None;
3411                while let Some(k) = map_.next_key()? {
3412                    match k {
3413                        GeneratedField::Epoch => {
3414                            if epoch__.is_some() {
3415                                return Err(serde::de::Error::duplicate_field("epoch"));
3416                            }
3417                            epoch__ = 
3418                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
3419                            ;
3420                        }
3421                        GeneratedField::Metadata => {
3422                            if metadata__.is_some() {
3423                                return Err(serde::de::Error::duplicate_field("metadata"));
3424                            }
3425                            metadata__ = map_.next_value()?;
3426                        }
3427                    }
3428                }
3429                Ok(sink_writer_stream_response::CommitResponse {
3430                    epoch: epoch__.unwrap_or_default(),
3431                    metadata: metadata__,
3432                })
3433            }
3434        }
3435        deserializer.deserialize_struct("connector_service.SinkWriterStreamResponse.CommitResponse", FIELDS, GeneratedVisitor)
3436    }
3437}
3438impl serde::Serialize for sink_writer_stream_response::StartResponse {
3439    #[allow(deprecated)]
3440    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3441    where
3442        S: serde::Serializer,
3443    {
3444        use serde::ser::SerializeStruct;
3445        let len = 0;
3446        let struct_ser = serializer.serialize_struct("connector_service.SinkWriterStreamResponse.StartResponse", len)?;
3447        struct_ser.end()
3448    }
3449}
3450impl<'de> serde::Deserialize<'de> for sink_writer_stream_response::StartResponse {
3451    #[allow(deprecated)]
3452    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3453    where
3454        D: serde::Deserializer<'de>,
3455    {
3456        const FIELDS: &[&str] = &[
3457        ];
3458
3459        #[allow(clippy::enum_variant_names)]
3460        enum GeneratedField {
3461        }
3462        impl<'de> serde::Deserialize<'de> for GeneratedField {
3463            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3464            where
3465                D: serde::Deserializer<'de>,
3466            {
3467                struct GeneratedVisitor;
3468
3469                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3470                    type Value = GeneratedField;
3471
3472                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3473                        write!(formatter, "expected one of: {:?}", &FIELDS)
3474                    }
3475
3476                    #[allow(unused_variables)]
3477                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3478                    where
3479                        E: serde::de::Error,
3480                    {
3481                            Err(serde::de::Error::unknown_field(value, FIELDS))
3482                    }
3483                }
3484                deserializer.deserialize_identifier(GeneratedVisitor)
3485            }
3486        }
3487        struct GeneratedVisitor;
3488        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3489            type Value = sink_writer_stream_response::StartResponse;
3490
3491            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3492                formatter.write_str("struct connector_service.SinkWriterStreamResponse.StartResponse")
3493            }
3494
3495            fn visit_map<V>(self, mut map_: V) -> std::result::Result<sink_writer_stream_response::StartResponse, V::Error>
3496                where
3497                    V: serde::de::MapAccess<'de>,
3498            {
3499                while map_.next_key::<GeneratedField>()?.is_some() {
3500                    let _ = map_.next_value::<serde::de::IgnoredAny>()?;
3501                }
3502                Ok(sink_writer_stream_response::StartResponse {
3503                })
3504            }
3505        }
3506        deserializer.deserialize_struct("connector_service.SinkWriterStreamResponse.StartResponse", FIELDS, GeneratedVisitor)
3507    }
3508}
3509impl serde::Serialize for SourceType {
3510    #[allow(deprecated)]
3511    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3512    where
3513        S: serde::Serializer,
3514    {
3515        let variant = match self {
3516            Self::Unspecified => "UNSPECIFIED",
3517            Self::Mysql => "MYSQL",
3518            Self::Postgres => "POSTGRES",
3519            Self::Citus => "CITUS",
3520            Self::Mongodb => "MONGODB",
3521            Self::SqlServer => "SQL_SERVER",
3522        };
3523        serializer.serialize_str(variant)
3524    }
3525}
3526impl<'de> serde::Deserialize<'de> for SourceType {
3527    #[allow(deprecated)]
3528    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3529    where
3530        D: serde::Deserializer<'de>,
3531    {
3532        const FIELDS: &[&str] = &[
3533            "UNSPECIFIED",
3534            "MYSQL",
3535            "POSTGRES",
3536            "CITUS",
3537            "MONGODB",
3538            "SQL_SERVER",
3539        ];
3540
3541        struct GeneratedVisitor;
3542
3543        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3544            type Value = SourceType;
3545
3546            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3547                write!(formatter, "expected one of: {:?}", &FIELDS)
3548            }
3549
3550            fn visit_i64<E>(self, v: i64) -> std::result::Result<Self::Value, E>
3551            where
3552                E: serde::de::Error,
3553            {
3554                i32::try_from(v)
3555                    .ok()
3556                    .and_then(|x| x.try_into().ok())
3557                    .ok_or_else(|| {
3558                        serde::de::Error::invalid_value(serde::de::Unexpected::Signed(v), &self)
3559                    })
3560            }
3561
3562            fn visit_u64<E>(self, v: u64) -> std::result::Result<Self::Value, E>
3563            where
3564                E: serde::de::Error,
3565            {
3566                i32::try_from(v)
3567                    .ok()
3568                    .and_then(|x| x.try_into().ok())
3569                    .ok_or_else(|| {
3570                        serde::de::Error::invalid_value(serde::de::Unexpected::Unsigned(v), &self)
3571                    })
3572            }
3573
3574            fn visit_str<E>(self, value: &str) -> std::result::Result<Self::Value, E>
3575            where
3576                E: serde::de::Error,
3577            {
3578                match value {
3579                    "UNSPECIFIED" => Ok(SourceType::Unspecified),
3580                    "MYSQL" => Ok(SourceType::Mysql),
3581                    "POSTGRES" => Ok(SourceType::Postgres),
3582                    "CITUS" => Ok(SourceType::Citus),
3583                    "MONGODB" => Ok(SourceType::Mongodb),
3584                    "SQL_SERVER" => Ok(SourceType::SqlServer),
3585                    _ => Err(serde::de::Error::unknown_variant(value, FIELDS)),
3586                }
3587            }
3588        }
3589        deserializer.deserialize_any(GeneratedVisitor)
3590    }
3591}
3592impl serde::Serialize for TableSchema {
3593    #[allow(deprecated)]
3594    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3595    where
3596        S: serde::Serializer,
3597    {
3598        use serde::ser::SerializeStruct;
3599        let mut len = 0;
3600        if !self.columns.is_empty() {
3601            len += 1;
3602        }
3603        if !self.pk_indices.is_empty() {
3604            len += 1;
3605        }
3606        let mut struct_ser = serializer.serialize_struct("connector_service.TableSchema", len)?;
3607        if !self.columns.is_empty() {
3608            struct_ser.serialize_field("columns", &self.columns)?;
3609        }
3610        if !self.pk_indices.is_empty() {
3611            struct_ser.serialize_field("pkIndices", &self.pk_indices)?;
3612        }
3613        struct_ser.end()
3614    }
3615}
3616impl<'de> serde::Deserialize<'de> for TableSchema {
3617    #[allow(deprecated)]
3618    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3619    where
3620        D: serde::Deserializer<'de>,
3621    {
3622        const FIELDS: &[&str] = &[
3623            "columns",
3624            "pk_indices",
3625            "pkIndices",
3626        ];
3627
3628        #[allow(clippy::enum_variant_names)]
3629        enum GeneratedField {
3630            Columns,
3631            PkIndices,
3632        }
3633        impl<'de> serde::Deserialize<'de> for GeneratedField {
3634            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3635            where
3636                D: serde::Deserializer<'de>,
3637            {
3638                struct GeneratedVisitor;
3639
3640                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3641                    type Value = GeneratedField;
3642
3643                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3644                        write!(formatter, "expected one of: {:?}", &FIELDS)
3645                    }
3646
3647                    #[allow(unused_variables)]
3648                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3649                    where
3650                        E: serde::de::Error,
3651                    {
3652                        match value {
3653                            "columns" => Ok(GeneratedField::Columns),
3654                            "pkIndices" | "pk_indices" => Ok(GeneratedField::PkIndices),
3655                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3656                        }
3657                    }
3658                }
3659                deserializer.deserialize_identifier(GeneratedVisitor)
3660            }
3661        }
3662        struct GeneratedVisitor;
3663        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3664            type Value = TableSchema;
3665
3666            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3667                formatter.write_str("struct connector_service.TableSchema")
3668            }
3669
3670            fn visit_map<V>(self, mut map_: V) -> std::result::Result<TableSchema, V::Error>
3671                where
3672                    V: serde::de::MapAccess<'de>,
3673            {
3674                let mut columns__ = None;
3675                let mut pk_indices__ = None;
3676                while let Some(k) = map_.next_key()? {
3677                    match k {
3678                        GeneratedField::Columns => {
3679                            if columns__.is_some() {
3680                                return Err(serde::de::Error::duplicate_field("columns"));
3681                            }
3682                            columns__ = Some(map_.next_value()?);
3683                        }
3684                        GeneratedField::PkIndices => {
3685                            if pk_indices__.is_some() {
3686                                return Err(serde::de::Error::duplicate_field("pkIndices"));
3687                            }
3688                            pk_indices__ = 
3689                                Some(map_.next_value::<Vec<::pbjson::private::NumberDeserialize<_>>>()?
3690                                    .into_iter().map(|x| x.0).collect())
3691                            ;
3692                        }
3693                    }
3694                }
3695                Ok(TableSchema {
3696                    columns: columns__.unwrap_or_default(),
3697                    pk_indices: pk_indices__.unwrap_or_default(),
3698                })
3699            }
3700        }
3701        deserializer.deserialize_struct("connector_service.TableSchema", FIELDS, GeneratedVisitor)
3702    }
3703}
3704impl serde::Serialize for ValidateSinkRequest {
3705    #[allow(deprecated)]
3706    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3707    where
3708        S: serde::Serializer,
3709    {
3710        use serde::ser::SerializeStruct;
3711        let mut len = 0;
3712        if self.sink_param.is_some() {
3713            len += 1;
3714        }
3715        let mut struct_ser = serializer.serialize_struct("connector_service.ValidateSinkRequest", len)?;
3716        if let Some(v) = self.sink_param.as_ref() {
3717            struct_ser.serialize_field("sinkParam", v)?;
3718        }
3719        struct_ser.end()
3720    }
3721}
3722impl<'de> serde::Deserialize<'de> for ValidateSinkRequest {
3723    #[allow(deprecated)]
3724    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3725    where
3726        D: serde::Deserializer<'de>,
3727    {
3728        const FIELDS: &[&str] = &[
3729            "sink_param",
3730            "sinkParam",
3731        ];
3732
3733        #[allow(clippy::enum_variant_names)]
3734        enum GeneratedField {
3735            SinkParam,
3736        }
3737        impl<'de> serde::Deserialize<'de> for GeneratedField {
3738            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3739            where
3740                D: serde::Deserializer<'de>,
3741            {
3742                struct GeneratedVisitor;
3743
3744                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3745                    type Value = GeneratedField;
3746
3747                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3748                        write!(formatter, "expected one of: {:?}", &FIELDS)
3749                    }
3750
3751                    #[allow(unused_variables)]
3752                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3753                    where
3754                        E: serde::de::Error,
3755                    {
3756                        match value {
3757                            "sinkParam" | "sink_param" => Ok(GeneratedField::SinkParam),
3758                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3759                        }
3760                    }
3761                }
3762                deserializer.deserialize_identifier(GeneratedVisitor)
3763            }
3764        }
3765        struct GeneratedVisitor;
3766        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3767            type Value = ValidateSinkRequest;
3768
3769            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3770                formatter.write_str("struct connector_service.ValidateSinkRequest")
3771            }
3772
3773            fn visit_map<V>(self, mut map_: V) -> std::result::Result<ValidateSinkRequest, V::Error>
3774                where
3775                    V: serde::de::MapAccess<'de>,
3776            {
3777                let mut sink_param__ = None;
3778                while let Some(k) = map_.next_key()? {
3779                    match k {
3780                        GeneratedField::SinkParam => {
3781                            if sink_param__.is_some() {
3782                                return Err(serde::de::Error::duplicate_field("sinkParam"));
3783                            }
3784                            sink_param__ = map_.next_value()?;
3785                        }
3786                    }
3787                }
3788                Ok(ValidateSinkRequest {
3789                    sink_param: sink_param__,
3790                })
3791            }
3792        }
3793        deserializer.deserialize_struct("connector_service.ValidateSinkRequest", FIELDS, GeneratedVisitor)
3794    }
3795}
3796impl serde::Serialize for ValidateSinkResponse {
3797    #[allow(deprecated)]
3798    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3799    where
3800        S: serde::Serializer,
3801    {
3802        use serde::ser::SerializeStruct;
3803        let mut len = 0;
3804        if self.error.is_some() {
3805            len += 1;
3806        }
3807        let mut struct_ser = serializer.serialize_struct("connector_service.ValidateSinkResponse", len)?;
3808        if let Some(v) = self.error.as_ref() {
3809            struct_ser.serialize_field("error", v)?;
3810        }
3811        struct_ser.end()
3812    }
3813}
3814impl<'de> serde::Deserialize<'de> for ValidateSinkResponse {
3815    #[allow(deprecated)]
3816    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3817    where
3818        D: serde::Deserializer<'de>,
3819    {
3820        const FIELDS: &[&str] = &[
3821            "error",
3822        ];
3823
3824        #[allow(clippy::enum_variant_names)]
3825        enum GeneratedField {
3826            Error,
3827        }
3828        impl<'de> serde::Deserialize<'de> for GeneratedField {
3829            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3830            where
3831                D: serde::Deserializer<'de>,
3832            {
3833                struct GeneratedVisitor;
3834
3835                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3836                    type Value = GeneratedField;
3837
3838                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3839                        write!(formatter, "expected one of: {:?}", &FIELDS)
3840                    }
3841
3842                    #[allow(unused_variables)]
3843                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3844                    where
3845                        E: serde::de::Error,
3846                    {
3847                        match value {
3848                            "error" => Ok(GeneratedField::Error),
3849                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3850                        }
3851                    }
3852                }
3853                deserializer.deserialize_identifier(GeneratedVisitor)
3854            }
3855        }
3856        struct GeneratedVisitor;
3857        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3858            type Value = ValidateSinkResponse;
3859
3860            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3861                formatter.write_str("struct connector_service.ValidateSinkResponse")
3862            }
3863
3864            fn visit_map<V>(self, mut map_: V) -> std::result::Result<ValidateSinkResponse, V::Error>
3865                where
3866                    V: serde::de::MapAccess<'de>,
3867            {
3868                let mut error__ = None;
3869                while let Some(k) = map_.next_key()? {
3870                    match k {
3871                        GeneratedField::Error => {
3872                            if error__.is_some() {
3873                                return Err(serde::de::Error::duplicate_field("error"));
3874                            }
3875                            error__ = map_.next_value()?;
3876                        }
3877                    }
3878                }
3879                Ok(ValidateSinkResponse {
3880                    error: error__,
3881                })
3882            }
3883        }
3884        deserializer.deserialize_struct("connector_service.ValidateSinkResponse", FIELDS, GeneratedVisitor)
3885    }
3886}
3887impl serde::Serialize for ValidateSourceRequest {
3888    #[allow(deprecated)]
3889    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
3890    where
3891        S: serde::Serializer,
3892    {
3893        use serde::ser::SerializeStruct;
3894        let mut len = 0;
3895        if self.source_id != 0 {
3896            len += 1;
3897        }
3898        if self.source_type != 0 {
3899            len += 1;
3900        }
3901        if !self.properties.is_empty() {
3902            len += 1;
3903        }
3904        if self.table_schema.is_some() {
3905            len += 1;
3906        }
3907        if self.is_source_job {
3908            len += 1;
3909        }
3910        if self.is_backfill_table {
3911            len += 1;
3912        }
3913        let mut struct_ser = serializer.serialize_struct("connector_service.ValidateSourceRequest", len)?;
3914        if self.source_id != 0 {
3915            #[allow(clippy::needless_borrow)]
3916            #[allow(clippy::needless_borrows_for_generic_args)]
3917            struct_ser.serialize_field("sourceId", ToString::to_string(&self.source_id).as_str())?;
3918        }
3919        if self.source_type != 0 {
3920            let v = SourceType::try_from(self.source_type)
3921                .map_err(|_| serde::ser::Error::custom(format!("Invalid variant {}", self.source_type)))?;
3922            struct_ser.serialize_field("sourceType", &v)?;
3923        }
3924        if !self.properties.is_empty() {
3925            struct_ser.serialize_field("properties", &self.properties)?;
3926        }
3927        if let Some(v) = self.table_schema.as_ref() {
3928            struct_ser.serialize_field("tableSchema", v)?;
3929        }
3930        if self.is_source_job {
3931            struct_ser.serialize_field("isSourceJob", &self.is_source_job)?;
3932        }
3933        if self.is_backfill_table {
3934            struct_ser.serialize_field("isBackfillTable", &self.is_backfill_table)?;
3935        }
3936        struct_ser.end()
3937    }
3938}
3939impl<'de> serde::Deserialize<'de> for ValidateSourceRequest {
3940    #[allow(deprecated)]
3941    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
3942    where
3943        D: serde::Deserializer<'de>,
3944    {
3945        const FIELDS: &[&str] = &[
3946            "source_id",
3947            "sourceId",
3948            "source_type",
3949            "sourceType",
3950            "properties",
3951            "table_schema",
3952            "tableSchema",
3953            "is_source_job",
3954            "isSourceJob",
3955            "is_backfill_table",
3956            "isBackfillTable",
3957        ];
3958
3959        #[allow(clippy::enum_variant_names)]
3960        enum GeneratedField {
3961            SourceId,
3962            SourceType,
3963            Properties,
3964            TableSchema,
3965            IsSourceJob,
3966            IsBackfillTable,
3967        }
3968        impl<'de> serde::Deserialize<'de> for GeneratedField {
3969            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
3970            where
3971                D: serde::Deserializer<'de>,
3972            {
3973                struct GeneratedVisitor;
3974
3975                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
3976                    type Value = GeneratedField;
3977
3978                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
3979                        write!(formatter, "expected one of: {:?}", &FIELDS)
3980                    }
3981
3982                    #[allow(unused_variables)]
3983                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
3984                    where
3985                        E: serde::de::Error,
3986                    {
3987                        match value {
3988                            "sourceId" | "source_id" => Ok(GeneratedField::SourceId),
3989                            "sourceType" | "source_type" => Ok(GeneratedField::SourceType),
3990                            "properties" => Ok(GeneratedField::Properties),
3991                            "tableSchema" | "table_schema" => Ok(GeneratedField::TableSchema),
3992                            "isSourceJob" | "is_source_job" => Ok(GeneratedField::IsSourceJob),
3993                            "isBackfillTable" | "is_backfill_table" => Ok(GeneratedField::IsBackfillTable),
3994                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
3995                        }
3996                    }
3997                }
3998                deserializer.deserialize_identifier(GeneratedVisitor)
3999            }
4000        }
4001        struct GeneratedVisitor;
4002        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
4003            type Value = ValidateSourceRequest;
4004
4005            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4006                formatter.write_str("struct connector_service.ValidateSourceRequest")
4007            }
4008
4009            fn visit_map<V>(self, mut map_: V) -> std::result::Result<ValidateSourceRequest, V::Error>
4010                where
4011                    V: serde::de::MapAccess<'de>,
4012            {
4013                let mut source_id__ = None;
4014                let mut source_type__ = None;
4015                let mut properties__ = None;
4016                let mut table_schema__ = None;
4017                let mut is_source_job__ = None;
4018                let mut is_backfill_table__ = None;
4019                while let Some(k) = map_.next_key()? {
4020                    match k {
4021                        GeneratedField::SourceId => {
4022                            if source_id__.is_some() {
4023                                return Err(serde::de::Error::duplicate_field("sourceId"));
4024                            }
4025                            source_id__ = 
4026                                Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0)
4027                            ;
4028                        }
4029                        GeneratedField::SourceType => {
4030                            if source_type__.is_some() {
4031                                return Err(serde::de::Error::duplicate_field("sourceType"));
4032                            }
4033                            source_type__ = Some(map_.next_value::<SourceType>()? as i32);
4034                        }
4035                        GeneratedField::Properties => {
4036                            if properties__.is_some() {
4037                                return Err(serde::de::Error::duplicate_field("properties"));
4038                            }
4039                            properties__ = Some(
4040                                map_.next_value::<std::collections::BTreeMap<_, _>>()?
4041                            );
4042                        }
4043                        GeneratedField::TableSchema => {
4044                            if table_schema__.is_some() {
4045                                return Err(serde::de::Error::duplicate_field("tableSchema"));
4046                            }
4047                            table_schema__ = map_.next_value()?;
4048                        }
4049                        GeneratedField::IsSourceJob => {
4050                            if is_source_job__.is_some() {
4051                                return Err(serde::de::Error::duplicate_field("isSourceJob"));
4052                            }
4053                            is_source_job__ = Some(map_.next_value()?);
4054                        }
4055                        GeneratedField::IsBackfillTable => {
4056                            if is_backfill_table__.is_some() {
4057                                return Err(serde::de::Error::duplicate_field("isBackfillTable"));
4058                            }
4059                            is_backfill_table__ = Some(map_.next_value()?);
4060                        }
4061                    }
4062                }
4063                Ok(ValidateSourceRequest {
4064                    source_id: source_id__.unwrap_or_default(),
4065                    source_type: source_type__.unwrap_or_default(),
4066                    properties: properties__.unwrap_or_default(),
4067                    table_schema: table_schema__,
4068                    is_source_job: is_source_job__.unwrap_or_default(),
4069                    is_backfill_table: is_backfill_table__.unwrap_or_default(),
4070                })
4071            }
4072        }
4073        deserializer.deserialize_struct("connector_service.ValidateSourceRequest", FIELDS, GeneratedVisitor)
4074    }
4075}
4076impl serde::Serialize for ValidateSourceResponse {
4077    #[allow(deprecated)]
4078    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
4079    where
4080        S: serde::Serializer,
4081    {
4082        use serde::ser::SerializeStruct;
4083        let mut len = 0;
4084        if self.error.is_some() {
4085            len += 1;
4086        }
4087        let mut struct_ser = serializer.serialize_struct("connector_service.ValidateSourceResponse", len)?;
4088        if let Some(v) = self.error.as_ref() {
4089            struct_ser.serialize_field("error", v)?;
4090        }
4091        struct_ser.end()
4092    }
4093}
4094impl<'de> serde::Deserialize<'de> for ValidateSourceResponse {
4095    #[allow(deprecated)]
4096    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
4097    where
4098        D: serde::Deserializer<'de>,
4099    {
4100        const FIELDS: &[&str] = &[
4101            "error",
4102        ];
4103
4104        #[allow(clippy::enum_variant_names)]
4105        enum GeneratedField {
4106            Error,
4107        }
4108        impl<'de> serde::Deserialize<'de> for GeneratedField {
4109            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
4110            where
4111                D: serde::Deserializer<'de>,
4112            {
4113                struct GeneratedVisitor;
4114
4115                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
4116                    type Value = GeneratedField;
4117
4118                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4119                        write!(formatter, "expected one of: {:?}", &FIELDS)
4120                    }
4121
4122                    #[allow(unused_variables)]
4123                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
4124                    where
4125                        E: serde::de::Error,
4126                    {
4127                        match value {
4128                            "error" => Ok(GeneratedField::Error),
4129                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
4130                        }
4131                    }
4132                }
4133                deserializer.deserialize_identifier(GeneratedVisitor)
4134            }
4135        }
4136        struct GeneratedVisitor;
4137        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
4138            type Value = ValidateSourceResponse;
4139
4140            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4141                formatter.write_str("struct connector_service.ValidateSourceResponse")
4142            }
4143
4144            fn visit_map<V>(self, mut map_: V) -> std::result::Result<ValidateSourceResponse, V::Error>
4145                where
4146                    V: serde::de::MapAccess<'de>,
4147            {
4148                let mut error__ = None;
4149                while let Some(k) = map_.next_key()? {
4150                    match k {
4151                        GeneratedField::Error => {
4152                            if error__.is_some() {
4153                                return Err(serde::de::Error::duplicate_field("error"));
4154                            }
4155                            error__ = map_.next_value()?;
4156                        }
4157                    }
4158                }
4159                Ok(ValidateSourceResponse {
4160                    error: error__,
4161                })
4162            }
4163        }
4164        deserializer.deserialize_struct("connector_service.ValidateSourceResponse", FIELDS, GeneratedVisitor)
4165    }
4166}
4167impl serde::Serialize for ValidationError {
4168    #[allow(deprecated)]
4169    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
4170    where
4171        S: serde::Serializer,
4172    {
4173        use serde::ser::SerializeStruct;
4174        let mut len = 0;
4175        if !self.error_message.is_empty() {
4176            len += 1;
4177        }
4178        let mut struct_ser = serializer.serialize_struct("connector_service.ValidationError", len)?;
4179        if !self.error_message.is_empty() {
4180            struct_ser.serialize_field("errorMessage", &self.error_message)?;
4181        }
4182        struct_ser.end()
4183    }
4184}
4185impl<'de> serde::Deserialize<'de> for ValidationError {
4186    #[allow(deprecated)]
4187    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
4188    where
4189        D: serde::Deserializer<'de>,
4190    {
4191        const FIELDS: &[&str] = &[
4192            "error_message",
4193            "errorMessage",
4194        ];
4195
4196        #[allow(clippy::enum_variant_names)]
4197        enum GeneratedField {
4198            ErrorMessage,
4199        }
4200        impl<'de> serde::Deserialize<'de> for GeneratedField {
4201            fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
4202            where
4203                D: serde::Deserializer<'de>,
4204            {
4205                struct GeneratedVisitor;
4206
4207                impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
4208                    type Value = GeneratedField;
4209
4210                    fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4211                        write!(formatter, "expected one of: {:?}", &FIELDS)
4212                    }
4213
4214                    #[allow(unused_variables)]
4215                    fn visit_str<E>(self, value: &str) -> std::result::Result<GeneratedField, E>
4216                    where
4217                        E: serde::de::Error,
4218                    {
4219                        match value {
4220                            "errorMessage" | "error_message" => Ok(GeneratedField::ErrorMessage),
4221                            _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
4222                        }
4223                    }
4224                }
4225                deserializer.deserialize_identifier(GeneratedVisitor)
4226            }
4227        }
4228        struct GeneratedVisitor;
4229        impl<'de> serde::de::Visitor<'de> for GeneratedVisitor {
4230            type Value = ValidationError;
4231
4232            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
4233                formatter.write_str("struct connector_service.ValidationError")
4234            }
4235
4236            fn visit_map<V>(self, mut map_: V) -> std::result::Result<ValidationError, V::Error>
4237                where
4238                    V: serde::de::MapAccess<'de>,
4239            {
4240                let mut error_message__ = None;
4241                while let Some(k) = map_.next_key()? {
4242                    match k {
4243                        GeneratedField::ErrorMessage => {
4244                            if error_message__.is_some() {
4245                                return Err(serde::de::Error::duplicate_field("errorMessage"));
4246                            }
4247                            error_message__ = Some(map_.next_value()?);
4248                        }
4249                    }
4250                }
4251                Ok(ValidationError {
4252                    error_message: error_message__.unwrap_or_default(),
4253                })
4254            }
4255        }
4256        deserializer.deserialize_struct("connector_service.ValidationError", FIELDS, GeneratedVisitor)
4257    }
4258}