Skip to content
This repository has been archived by the owner on Dec 20, 2018. It is now read-only.

Row to AVRO for Streaming Usecase #274

Open
sunny1978 opened this issue Mar 15, 2018 · 2 comments
Open

Row to AVRO for Streaming Usecase #274

sunny1978 opened this issue Mar 15, 2018 · 2 comments

Comments

@sunny1978
Copy link

sunny1978 commented Mar 15, 2018

Hi
I have a use case to convert a Row to AVRO and write those Bytes to Kafka Sink. Kindly help me.
I tried this:

Tried This:

RecordBuilder recordBuilder = SchemaBuilder.record(kafkaSinkConfig.getRecordName())
.namespace(kafkaSinkConfig.getRecordNamespace());
Schema schema = SchemaConverters.convertStructToAvro(row.schema(), recordBuilder, kafkaSinkConfig.getRecordNamespace());
GenericData.Record avroRecord = (GenericData.Record) GenericData.get().newRecord(null, schema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
byte[] data = recordInjection.apply(avroRecord);
kafkaProducerNew.send(new ProducerRecord<String, byte[]>(topicName, data));

This works for Flat AVRO schemas. If we have nested, it fails.

Write To Kafka: (Need Help)
I am expecting an API like:
If you can expose this private method - that will do it I guess
https://github.com/databricks/spark-avro/blob/branch-4.0/src/main/scala/com/databricks/spark/avro/AvroOutputWriter.scala-->converter(row).asInstanceOf[GenericRecord])

GenericRecord avroRecord = AvroOutputWriter.convertToAvro(row)
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
byte[] data = recordInjection.apply(avroRecord);
kafkaProducerNew.send(new ProducerRecord<String, byte[]>(topicName, data));

Read from Kafka:

JavaInputDStream<ConsumerRecord<String, byte[]>> messages = KafkaUtils.createDirectStream(jsc,LocationStrategies.PreferConsistent(),ConsumerStrategies.<String, byte[]>Subscribe(topics, kafkaParamsD));
JavaDStream<byte[]> dStream = messages.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() {....}

I cannot find RddUtils.rddToDataFrame. Where is this api? Basically, I want to pass bytes[] and avroSchema json string. Given these two - can I get a GenericRecord back?

@sunny1978
Copy link
Author

@gsolasab, @squito, @gengliangwang @rxin
Can some one look in to this request. We have a use case to write AVRO records to Kafka.
We read some data from Kafka as String/Ascii, perform some ETL operation like extract zipped content, join on some lookup and finally prepare some nested row DataFrame. Finally write to Kafka as AVRO.
In this process I am looking for generic way to convert Row to avro byte[] so that I can write it to Kafka.

@sunny1978
Copy link
Author

My Row Schema:

Row.Schema
StructType(
StructField(masterReservation,StructType(
StructField(cancellationLocalTimestamp,StringType,true),
StructField(cancellationNumber,StringType,true),
StructField(confirmationNumber,StringType,true),
StructField(confirmationUtcDate,StringType,true),
StructField(createdTimestamp,StringType,true),
StructField(guestProfile,ArrayType(StructType(
StructField(guestFirstName,StringType,true),
StructField(guestIdentifier,LongType,true),
StructField(guestLastName,StringType,true),
StructField(guestMember,StructType(
StructField(memberIdentifier,StringType,true)),true)),true),true),
StructField(imageCommittedSequenceNumber,LongType,true),
StructField(imageIdentifier,LongType,true),
StructField(imageStatusCode,StringType,true),
StructField(imageUtcTimestamp,StringType,true),
StructField(pmsConfirmationNumber,StringType,true),
StructField(ratePlan,ArrayType(StructType(
StructField(priceGridCode,StringType,true),
StructField(ratePlanCode,StringType,true),
StructField(ratePlanName,StringType,true)),true),true),
StructField(reservationActivityType,StringType,true),
StructField(reservationChannel,ArrayType(StructType(
StructField(channelCode,StringType,true),
StructField(channelRoleCode,StringType,true),
StructField(channelSubCode,StringType,true),
StructField(iataNumber,StringType,true),
StructField(vendorCode,StringType,true),
StructField(vendorType,ArrayType(LongType,true),true)),true),true),
StructField(reservationProduct,StructType(StructField(prod_desc,StringType,true), StructField(rms_prod_class_cd,StringType,true)),true),
StructField(reservationProperty,StructType(StructField(brandName,StringType,true), StructField(crsBrandCode,StringType,true), StructField(hotelFactText,ArrayType(StringType,true),true), StructField(propertyChainCode,StringType,true), StructField(propertyCode,StringType,true), StructField(propertyName,StringType,true)),true),
StructField(reservationSegment,ArrayType(StructType(
StructField(averageBaseRateAmount,StringType,true),
StructField(checkInDate,StringType,true),
StructField(checkOutDate,StringType,true),
StructField(localCurrencyCode,StringType,true),
StructField(loyaltyGuestIdentifier,LongType,true),
StructField(propertyCode,StringType,true),
StructField(ratePlanCode,StringType,true),
StructField(reservationHolderGuestIdentifier,LongType,true),
StructField(reservationProductOffer,StructType(
StructField(businessProductCode,StringType,true),
StructField(guestAdultQuantity,LongType,true),
StructField(guestChildQuantity,LongType,true),
StructField(productQuantity,LongType,true)),true),
StructField(segConsolidatedStatusCode,StringType,true),
StructField(segmentActionStatusCode,StringType,true),
StructField(segmentDailyRate,ArrayType(StructType(
StructField(dailyRateEndDate,StringType,true),
StructField(dailyRateStartDate,StringType,true),
StructField(extraPersonAfterTaxAmount,StringType,true),
StructField(occupancyBaseAfterTaxAmount,StringType,true),
StructField(occupancyBaseRateAmount,StringType,true),
StructField(totalServiceChargeAmount,StringType,true),
StructField(totalTaxAmount,StringType,true)),true),true),
StructField(segmentGroupCode,StringType,true),
StructField(segmentIdentifier,LongType,true),
StructField(segmentPmsStatusCode,StringType,true),
StructField(segmentSequenceNumber,LongType,true)),true),true)),true))

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant