Skip to content

Commit

Permalink
fix test added in previous commit (apache#31116)
Browse files Browse the repository at this point in the history
  • Loading branch information
benkonz authored Apr 26, 2024
1 parent 5c3786e commit 3e5a658
Showing 1 changed file with 13 additions and 17 deletions.
30 changes: 13 additions & 17 deletions sdks/python/apache_beam/io/avroio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from apache_beam.io.avroio import avro_atomic_value_to_beam_atomic_value # For testing
from apache_beam.io.avroio import avro_union_type_to_beam_type # For testing
from apache_beam.io.avroio import beam_atomic_value_to_avro_atomic_value # For testing
from apache_beam.io.avroio import avro_dict_to_beam_row # For testing
from apache_beam.io.avroio import beam_row_to_avro_dict # For testing
from apache_beam.io.avroio import _create_avro_sink # For testing
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import StandardOptions
Expand Down Expand Up @@ -181,23 +183,17 @@ def test_avro_schema_to_beam_schema_with_nullable_atomic_fields(self):
records.append({
'name': 'Bruce', 'favorite_number': None, 'favorite_color': None
})
with tempfile.TemporaryDirectory() as tmp_dirname_input:
input_path = os.path.join(tmp_dirname_input, 'tmp_filename.avro')
parsed_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
with open(input_path, 'wb') as tmp_avro_file:
fastavro.writer(tmp_avro_file, parsed_schema, records)

with tempfile.TemporaryDirectory() as tmp_dirname_output:

with TestPipeline() as p:
_ = (
p
| avroio.ReadFromAvro(input_path, as_rows=True)
| SqlTransform("SELECT * FROM PCOLLECTION")
| avroio.WriteToAvro(tmp_dirname_output))
with TestPipeline() as p:
readback = (p | avroio.ReadFromAvro(tmp_dirname_output + "*"))
assert_that(readback, equal_to(records))
avro_schema = fastavro.parse_schema(json.loads(self.SCHEMA_STRING))
beam_schema = avro_schema_to_beam_schema(avro_schema)

with TestPipeline() as p:
readback = (
p
| Create(records)
| beam.Map(avro_dict_to_beam_row(avro_schema, beam_schema))
| SqlTransform("SELECT * FROM PCOLLECTION")
| beam.Map(beam_row_to_avro_dict(avro_schema, beam_schema)))
assert_that(readback, equal_to(records))

def test_avro_atomic_value_to_beam_atomic_value(self):
input_outputs = [('int', 1, 1), ('int', -1, 0xffffffff),
Expand Down

0 comments on commit 3e5a658

Please sign in to comment.