Skip to content

Commit

Permalink
Plain JSON Mode
Browse files Browse the repository at this point in the history
  • Loading branch information
clemensv committed Apr 30, 2024
1 parent abf9b88 commit befb6da
Show file tree
Hide file tree
Showing 35 changed files with 1,481 additions and 291 deletions.
19 changes: 2 additions & 17 deletions lang/csharp/src/apache/main/Generic/GenericReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ public object Read(object reuse, Schema writerSchema, Schema readerSchema, Decod
case Schema.Type.Union:
return ReadUnion(reuse, (UnionSchema)writerSchema, readerSchema, d);
case Schema.Type.Logical:
return ReadLogical(reuse, (LogicalSchema)writerSchema, readerSchema, d);
LogicalSchema writerLogicalSchema = writerSchema as LogicalSchema;
return Read<object>(writerSchema.Tag, readerSchema, ()=>d.ReadLogicalTypeValue(writerLogicalSchema));
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
Expand Down Expand Up @@ -552,22 +553,6 @@ protected virtual object ReadUnion(object reuse, UnionSchema writerSchema, Schem
return Read(reuse, ws, readerSchema, d);
}

/// <summary>
/// Deserializes an object based on the writer's logical schema. Uses the underlying logical type to convert
/// the value to the logical type.
/// </summary>
/// <param name="reuse">If appropriate, uses this object instead of creating a new one.</param>
/// <param name="writerSchema">The UnionSchema that the writer used.</param>
/// <param name="readerSchema">The schema the reader uses.</param>
/// <param name="d">The decoder for serialization.</param>
/// <returns>The deserialized object.</returns>
protected virtual object ReadLogical(object reuse, LogicalSchema writerSchema, Schema readerSchema, Decoder d)
{
LogicalSchema ls = (LogicalSchema)readerSchema;

return writerSchema.LogicalType.ConvertToLogicalValue(Read(reuse, writerSchema.BaseSchema, ls.BaseSchema, d), ls);
}

/// <summary>
/// Deserializes a fixed object and returns the object. The default implementation uses CreateFixed()
/// and GetFixedBuffer() and returns what CreateFixed() returned.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private ReadItem ResolveReader(Schema writerSchema, Schema readerSchema)
case Schema.Type.Union:
return ResolveUnion((UnionSchema)writerSchema, readerSchema);
case Schema.Type.Logical:
return ResolveLogical((LogicalSchema)writerSchema, (LogicalSchema)readerSchema);
return Read(d=>d.ReadLogicalTypeValue((LogicalSchema)writerSchema));
default:
throw new AvroException("Unknown schema type: " + writerSchema);
}
Expand Down Expand Up @@ -400,12 +400,6 @@ private object ReadArray(object reuse, Decoder decoder, ArrayAccess arrayAccess,
return array;
}

private ReadItem ResolveLogical(LogicalSchema writerSchema, LogicalSchema readerSchema)
{
var baseReader = ResolveReader(writerSchema.BaseSchema, readerSchema.BaseSchema);
return (r, d) => readerSchema.LogicalType.ConvertToLogicalValue(baseReader(r, d), readerSchema);
}

private ReadItem ResolveFixed(FixedSchema writerSchema, FixedSchema readerSchema)
{
if (readerSchema.Size != writerSchema.Size)
Expand Down
15 changes: 3 additions & 12 deletions lang/csharp/src/apache/main/Generic/PreresolvingDatumWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using Avro.Util;
using Encoder = Avro.IO.Encoder;

namespace Avro.Generic
Expand Down Expand Up @@ -100,7 +102,7 @@ private WriteItem ResolveWriter( Schema schema )
case Schema.Type.Union:
return ResolveUnion((UnionSchema)schema);
case Schema.Type.Logical:
return ResolveLogical((LogicalSchema)schema);
return (v,e) => Write<object>(v, schema.Tag, (w)=>e.WriteLogicalTypeValue(w, (LogicalSchema)schema));
default:
return (v, e) => Error(schema, v);
}
Expand Down Expand Up @@ -235,17 +237,6 @@ private void WriteArray(WriteItem itemWriter, object array, Encoder encoder)
encoder.WriteArrayEnd();
}

/// <summary>
/// Serializes a logical value object by using the underlying logical type to convert the value
/// to its base value.
/// </summary>
/// <param name="schema">The logical schema.</param>
protected WriteItem ResolveLogical(LogicalSchema schema)
{
var baseWriter = ResolveWriter(schema.BaseSchema);
return (d, e) => baseWriter(schema.LogicalType.ConvertToBaseValue(d, schema), e);
}

private WriteItem ResolveMap(MapSchema mapSchema)
{
var itemWriter = ResolveWriter(mapSchema.ValueSchema);
Expand Down
23 changes: 23 additions & 0 deletions lang/csharp/src/apache/main/IO/BinaryDecoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,29 @@ public void ReadFixed(byte[] buffer, int start, int length)
Read(buffer, start, length);
}

/// <inheritdoc/>
public object ReadLogicalTypeValue(LogicalSchema logicalSchema)
{
Schema baseSchema = logicalSchema.BaseSchema;
switch(baseSchema.Tag)
{
case Schema.Type.Int:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadInt(), logicalSchema);
case Schema.Type.Long:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadLong(), logicalSchema);
case Schema.Type.Bytes:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadBytes(), logicalSchema);
case Schema.Type.String:
return logicalSchema.LogicalType.ConvertToLogicalValue(ReadString(), logicalSchema);
case Schema.Type.Fixed:
byte[] fixedValue = new byte[((FixedSchema)baseSchema).Size];
ReadFixed(fixedValue);
return logicalSchema.LogicalType.ConvertToLogicalValue(fixedValue, logicalSchema);
default:
throw new AvroException($"Unsupported logical type: {logicalSchema.Tag}");
}
}

/// <summary>
/// Skips over a null value.
/// </summary>
Expand Down
32 changes: 32 additions & 0 deletions lang/csharp/src/apache/main/IO/BinaryEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,38 @@ public void WriteFixed(byte[] data, int start, int len)
stream.Write(data, start, len);
}

/// <inheritdoc/>
public void WriteLogicalTypeValue(object value, LogicalSchema schema)
{
var baseValue = schema.LogicalType.ConvertToBaseValue(value, schema);
switch (baseValue)
{
case int i:
WriteInt(i);
break;
case long l:
WriteLong(l);
break;
case float f:
WriteFloat(f);
break;
case double d:
WriteDouble(d);
break;
case byte[] bytes:
WriteBytes(bytes);
break;
case string s:
WriteString(s);
break;
case Avro.Generic.GenericFixed fixedValue:
WriteFixed(fixedValue.Value);
break;
default:
throw new AvroTypeException($"Unsupported conversion from {baseValue.GetType()}");
}
}

private void writeBytes(byte[] bytes)
{
stream.Write(bytes, 0, bytes.Length);
Expand Down
8 changes: 8 additions & 0 deletions lang/csharp/src/apache/main/IO/Decoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ public interface Decoder
/// <param name="length">Number of bytes to read</param>
void ReadFixed(byte[] buffer, int start, int length);


/// <summary>
/// Reads a logical type value.
/// </summary>
/// <param name="schema">Schema of the logical type</param>
/// <returns></returns>
object ReadLogicalTypeValue(LogicalSchema schema);

/// <summary>
/// Skips a null Avro type on the stream.
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions lang/csharp/src/apache/main/IO/Encoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public interface Encoder
/// <param name="len">Number of bytes to write.</param>
void WriteFixed(byte[] data, int start, int len);

/// <summary>
/// Writes a logical type value
/// </summary>
/// <param name="value">Value to be written</param>
/// <param name="schema">Logical type schema</param>
void WriteLogicalTypeValue(object value, LogicalSchema schema);

/// <summary>
/// Flushes the encoder.
/// </summary>
Expand Down
Loading

0 comments on commit befb6da

Please sign in to comment.