Skip to content

Commit

Permalink
- Added support for JSON batch encoding/decoding.
Browse files Browse the repository at this point in the history
  • Loading branch information
yuce committed Sep 30, 2020
1 parent 3d014fe commit be69f7c
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 16 deletions.
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,19 @@ Check out the [CloudEvents spec](https://github.com/cloudevents/spec/blob/v1.0/s
This package has no dependencies beyond the Python standard library with the base install.
Optionally depends on the `avro` package for Avro encode/decode functionality.

## Features

* Implements CloudEvents 1.0 spec.
* JSON and JSON batch encoding/decoding.
* Avro encoding/decoding.
* Simple API.

## News

### 0.2.3 - (*2020-09-30*)

* Added support for encoding/decoding batch events in JSON.

### 0.2.2 - (*2020-09-29*)

* First public release.
Expand All @@ -30,6 +41,8 @@ Install with JSON and Avro codecs:

## Usage:

### Creating Events

Create a CloudEvent with required attributes:

```python
Expand Down Expand Up @@ -101,6 +114,8 @@ Extension attributes can be accessed using the `attribute` method:
assert event.attribute("external1") == "foo/bar"
```

### Encoding/Decoding Events in JSON

Encode an event in JSON:

```python
Expand All @@ -111,6 +126,30 @@ encoded_event = Json.encode(event)

Note that blank fields won't be encoded.

Encode a batch of events in JSON:

```python
from spce import CloudEvent, Json

event_batch = [
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
datacontenttype="application/json",
data=r'{"spo2": 99})',
),
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1001",
datacontenttype="application/json",
data=b'\x01binarydata\x02',
),
]
encoded_batch = Json.encode(event_batch)
```

Decode an event in JSON:

```python
Expand All @@ -132,6 +171,34 @@ text = """
decoded_event = Json.decode(text)
```

Decode a batch of events in JSON:

```python
text = r'''
[
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1000",
"specversion":"1.0",
"datacontenttype": "application/json",
"data": "{\"spo2\": 99}"
},
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1001",
"specversion":"1.0",
"datacontenttype": "application/json",
"data_base64": "AWJpbmFyeWRhdGEC"
}
]
'''
decoded_events = Json.decode(text)
```

### Encoding/Decoding Events in Avro

Encode an event in Avro:

```python
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

setup(
name='spce',
version='0.2.2',
version='0.2.3',
packages=['spce'],
url='https://github.com/scaleplandev/spce-python',
license='Apache 2.0',
Expand Down
45 changes: 30 additions & 15 deletions spce/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import json
from base64 import b64encode, b64decode
from typing import Union, Iterable

from .cloudevents import CloudEvent

Expand All @@ -25,24 +26,38 @@ class Json:
_ENCODER = json.JSONEncoder()

@classmethod
def encode(cls, event: CloudEvent):
kvs = []
encoder = cls._ENCODER
for attr, value in event._attributes.items():
if value:
kvs.append('"%s":%s' % (attr, encoder.encode(value)))
if event._data:
if event._has_binary_data:
kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode()))
else:
kvs.append('"data":%s' % encoder.encode(event._data))
return "{%s}" % ",".join(kvs)
def encode(cls, event: Union[CloudEvent, Iterable[CloudEvent]]) -> str:
if isinstance(event, Iterable):
encoded = [cls.encode(e) for e in event]
return "[%s]" % ",".join(encoded)
elif isinstance(event, CloudEvent):
kvs = []
encoder = cls._ENCODER
for attr, value in event._attributes.items():
if value:
kvs.append('"%s":%s' % (attr, encoder.encode(value)))
if event._data:
if event._has_binary_data:
kvs.append('"data_base64":%s' % encoder.encode(b64encode(event._data).decode()))
else:
kvs.append('"data":%s' % encoder.encode(event._data))
return "{%s}" % ",".join(kvs)
else:
raise TypeError("JSON.encode cannot encode %s" % type(event))

@classmethod
def decode(cls, text: str) -> CloudEvent:
def decode(cls, text: str) -> Union[CloudEvent, Iterable[CloudEvent]]:
d = json.loads(text)
if isinstance(d, dict):
return CloudEvent(**cls._normalize_data(d))
elif isinstance(d, Iterable):
return [CloudEvent(**cls._normalize_data(it)) for it in d]
else:
raise TypeError("JSON.decode cannot decode %s" % type(d))

@classmethod
def _normalize_data(cls, d: dict) -> dict:
if "data_base64" in d:
d["data"] = b64decode(d["data_base64"])
del d["data_base64"]

return CloudEvent(**d)
return d
130 changes: 130 additions & 0 deletions tests/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,72 @@ def test_encode_extension_attribute(self):
'''
self.assertEqual(json.loads(target), json.loads(encoded))

def test_encode_batch_0_items(self):
self.assertEqual("[]", Json.encode([]))

def test_encode_batch_1_item(self):
event_batch = [
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
datacontenttype="application/json",
data=json.dumps({"spo2": 99}),
)
]
encoded_batch = Json.encode(event_batch)
target = r'''
[{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1000",
"specversion":"1.0",
"datacontenttype": "application/json",
"data": "{\"spo2\": 99}"
}]
'''
self.assertEqual(json.loads(target), json.loads(encoded_batch))

def test_encode_batch_2_items(self):
event_batch = [
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
datacontenttype="application/json",
data=json.dumps({"spo2": 99}),
),
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1001",
datacontenttype="application/json",
data=b'\x01binarydata\x02',
),
]
encoded_batch = Json.encode(event_batch)
target = r'''
[
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1000",
"specversion":"1.0",
"datacontenttype": "application/json",
"data": "{\"spo2\": 99}"
},
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1001",
"specversion":"1.0",
"datacontenttype": "application/json",
"data_base64": "AWJpbmFyeWRhdGEC"
}
]
'''
self.assertEqual(json.loads(target), json.loads(encoded_batch))


class JsonDecoderTests(unittest.TestCase):

Expand Down Expand Up @@ -222,3 +288,67 @@ def test_decode_extension_attribute(self):
)
event = Json.decode(encoded_event)
self.assertEqual(target, event)

def test_decode_batch_0_items(self):
self.assertEqual([], Json.decode("[]"))

def test_decode_batch_1_item(self):
encoded_batch = r'''
[{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1000",
"specversion":"1.0",
"datacontenttype": "application/json",
"data": "{\"spo2\": 99}"
}]
'''
target = [
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
datacontenttype="application/json",
data=json.dumps({"spo2": 99}),
)
]
self.assertEqual(target, Json.decode(encoded_batch))

def test_decode_batch_2_items(self):
encoded_batch = r'''
[
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1000",
"specversion":"1.0",
"datacontenttype": "application/json",
"data": "{\"spo2\": 99}"
},
{
"type":"OximeterMeasured",
"source":"oximeter/123",
"id":"1001",
"specversion":"1.0",
"datacontenttype": "application/json",
"data_base64": "AWJpbmFyeWRhdGEC"
}
]
'''
target = [
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1000",
datacontenttype="application/json",
data=json.dumps({"spo2": 99}),
),
CloudEvent(
type="OximeterMeasured",
source="oximeter/123",
id="1001",
datacontenttype="application/json",
data=b'\x01binarydata\x02',
),
]
self.assertEqual(target, Json.decode(encoded_batch))

0 comments on commit be69f7c

Please sign in to comment.