Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

updating fix #570

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions src/reporter/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from geocoding.factory import get_geo_cache, is_geo_coding_available
from requests import RequestException
from translators.sql_translator import SQLTranslator
from utils.common import iter_entity_attrs, TIME_INDEX_NAME
from utils.common import iter_entity_attrs, TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME
import json
import logging
import requests
Expand All @@ -42,7 +42,7 @@
from exceptions.exceptions import NGSIUsageError, InvalidParameterValue, InvalidHeaderValue
from wq.ql.notify import InsertAction
from reporter.httputil import fiware_correlator, fiware_s, fiware_sp

from reporter.timex import select_time_index_attr

def log():
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -133,6 +133,7 @@ def _filter_empty_entities(payload):
def _filter_no_type_no_value_entities(payload):
attrs = list(iter_entity_attrs(payload))
attrs.remove('time_index')
attrs.remove('time_index_attribute')
for i in attrs:
attr = payload.get(i, {})
try:
Expand All @@ -143,7 +144,6 @@ def _filter_no_type_no_value_entities(payload):
# remove attributes without value or type
except Exception as e:
del payload[i]

return payload


Expand All @@ -170,6 +170,10 @@ def notify():
custom_index = request.headers.get(TIME_INDEX_HEADER_NAME, None)
entity[TIME_INDEX_NAME] = \
select_time_index_value_as_iso(custom_index, entity)
# Add TIME_INDEX_ATTRIBUTE_NAME
attr_name = iter_entity_attrs(entity)
entity[TIME_INDEX_ATTRIBUTE_NAME] = \
select_time_index_attr(attr_name, entity)
# Add GEO-DATE if enabled
if not entity.get(LOCATION_ATTR_NAME, None):
add_geodata(entity)
Expand Down
14 changes: 8 additions & 6 deletions src/reporter/timex.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from utils.timestr import latest_from_str_rep, to_datetime

TIME_INDEX_HEADER_NAME = 'Fiware-TimeIndex-Attribute'
TIME_INDEX_ATTRIBUTE_NAME = 'time_index_attribute'

MaybeString = Union[str, None]

Expand Down Expand Up @@ -61,7 +62,6 @@ def time_index_priority_list(
"""
# Custom time index attribute
yield to_datetime(_attribute(notification, custom_index))

# The most recent custom time index metadata
yield latest_from_str_rep(_iter_metadata(notification, custom_index))

Expand Down Expand Up @@ -104,7 +104,6 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime:
be converted to a ``datetime``. Items are considered from top to bottom,
so that if multiple values are present and they can all be converted to
``datetime``, the topmost value is chosen.

- Custom time index. The value of the ``TIME_INDEX_HEADER_NAME``. Note
that for a notification to contain such header, the corresponding
subscription has to be created with an ``httpCustom`` block as detailed
Expand All @@ -124,7 +123,6 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime:
- Current time. This is the default value we use if any of the above isn't
present or none of the values found can actually be converted to a
``datetime``.

:param custom_index: name of the custom_index (if requested,
None otherwise)
:param notification: the notification JSON payload as received from Orion.
Expand All @@ -136,10 +134,14 @@ def select_time_index_value(custom_index: str, notification: dict) -> datetime:
custom_index, notification):
if index_candidate:
return index_candidate

# use the current time as a last resort
return current_time


def select_time_index_attr(attr_name:str, notification: dict):
attr_names = []
for attr_name in iter_entity_attrs(notification):
if attr_name != 'time_index':
attr_names.append(attr_name)
return attr_names

def select_time_index_value_as_iso(custom_index: str, notification: dict) -> \
str:
Expand Down
6 changes: 3 additions & 3 deletions src/translators/base_translator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from utils.common import TIME_INDEX_NAME

from utils.common import TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME

class BaseTranslator(object):
"""
Expand All @@ -11,7 +10,8 @@ class BaseTranslator(object):
# Note: Some databases will restrict the possible names for tables and
# columns.
TIME_INDEX_NAME = TIME_INDEX_NAME

TIME_INDEX_ATTRIBUTE_NAME = TIME_INDEX_ATTRIBUTE_NAME

def __init__(self, host, port, db_name):
self.host = host
self.port = port
Expand Down
5 changes: 3 additions & 2 deletions src/translators/crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from translators.errors import CrateErrorAnalyzer
from translators.sql_translator import NGSI_ISO8601, NGSI_DATETIME, \
NGSI_GEOJSON, NGSI_GEOPOINT, NGSI_TEXT, NGSI_STRUCTURED_VALUE, \
NGSI_LD_GEOMETRY, TIME_INDEX, METADATA_TABLE_NAME, FIWARE_SERVICEPATH
NGSI_LD_GEOMETRY, TIME_INDEX, TIME_INDEX_ATTRIBUTE, METADATA_TABLE_NAME, FIWARE_SERVICEPATH
import logging
from .crate_geo_query import from_ngsi_query
from utils.cfgreader import EnvReader, StrVar, IntVar, FloatVar
Expand All @@ -34,7 +34,8 @@
"Number": 'real',
NGSI_TEXT: 'text',
NGSI_STRUCTURED_VALUE: 'object',
TIME_INDEX: 'timestamptz'
TIME_INDEX: 'timestamptz',
TIME_INDEX_ATTRIBUTE: CRATE_ARRAY_STR
}

CRATE_TO_NGSI = dict((v, k) for (k, v) in NGSI_TO_SQL.items())
Expand Down
34 changes: 23 additions & 11 deletions src/translators/sql_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import dateutil.parser
from typing import Any, List, Optional, Sequence
from uuid import uuid4
from reporter.timex import select_time_index_attr

from cache.factory import get_cache, is_cache_available
from translators.insert_splitter import to_insert_batches
Expand All @@ -35,6 +36,7 @@
TENANT_PREFIX = 'mt'
TYPE_PREFIX = 'et'
TIME_INDEX = 'timeindex'
TIME_INDEX_ATTRIBUTE = 'time_index_attribute'
VALID_AGGR_METHODS = ['count', 'sum', 'avg', 'min', 'max']
VALID_AGGR_PERIODS = ['year', 'month', 'day', 'hour', 'minute', 'second']
# The name of the column where we store the original JSON entity received
Expand Down Expand Up @@ -63,7 +65,8 @@
NGSI_TEXT: 'text',
# NOT all databases supports JSON objects
NGSI_STRUCTURED_VALUE: 'text',
TIME_INDEX: 'timestamp WITH TIME ZONE NOT NULL'
TIME_INDEX: 'timestamp WITH TIME ZONE NOT NULL',
TIME_INDEX_ATTRIBUTE: 'array(string)'
}


Expand Down Expand Up @@ -269,18 +272,17 @@ def _insert_entities_of_type(self,
"It should have been inserted by the 'Reporter'. {}"
warnings.warn(msg.format(e))
e[self.TIME_INDEX_NAME] = current_timex()

if ORIGINAL_ENTITY_COL in e:
raise ValueError(
f"Entity {e[NGSI_ID]} has a reserved attribute name: " +
"'{ORIGINAL_ENTITY_COL_NAME}'")

# Define column types
# {column_name -> crate_column_type}
table = {
'entity_id': self.NGSI_TO_SQL['Text'],
'entity_type': self.NGSI_TO_SQL['Text'],
self.TIME_INDEX_NAME: self.NGSI_TO_SQL[TIME_INDEX],
self.TIME_INDEX_ATTRIBUTE_NAME: self.NGSI_TO_SQL[TIME_INDEX_ATTRIBUTE],
FIWARE_SERVICEPATH: self.NGSI_TO_SQL['Text'],
ORIGINAL_ENTITY_COL: self.NGSI_TO_SQL[NGSI_STRUCTURED_VALUE],
'instanceId': self.NGSI_TO_SQL['Text']
Expand All @@ -293,13 +295,13 @@ def _insert_entities_of_type(self,
'entity_id': (NGSI_ID, NGSI_TEXT),
self.TIME_INDEX_NAME: (self.TIME_INDEX_NAME, NGSI_DATETIME),
}

for e in entities:
entity_id = e.get('id')
for attr in iter_entity_attrs(e):
if attr == self.TIME_INDEX_NAME:
continue

if attr == self.TIME_INDEX_ATTRIBUTE_NAME:
continue
if isinstance(e[attr], dict) and 'type' in e[attr] \
and e[attr]['type'] != 'Property':
attr_t = e[attr]['type']
Expand Down Expand Up @@ -481,6 +483,8 @@ def _preprocess_values(self, e, original_attrs, col_names,
values.append(e['id'])
elif cn == self.TIME_INDEX_NAME:
values.append(e[self.TIME_INDEX_NAME])
elif cn == self.TIME_INDEX_ATTRIBUTE_NAME:
values.append(e[self.TIME_INDEX_ATTRIBUTE_NAME])
elif cn == FIWARE_SERVICEPATH:
values.append(fiware_servicepath or '')
elif cn == 'instanceId':
Expand Down Expand Up @@ -805,11 +809,10 @@ def _get_limit(self, limit, last_n):
f"last_n should be >=1 and <= {default_limit}.")
return min(last_n, limit)

def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None,
def _get_where_clause(self, attr_names, entity_ids, from_date, to_date, fiware_sp=None,
geo_query=None, prefix=''):
clauses = []
where_clause = ""

if entity_ids:
ids = ",".join("'{}'".format(e) for e in entity_ids)
clauses.append(" {}entity_id in ({}) ".format(prefix, ids))
Expand All @@ -819,7 +822,6 @@ def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None,
if to_date:
clauses.append(" {}{} <= '{}'".format(prefix, self.TIME_INDEX_NAME,
self._parse_date(to_date)))

if fiware_sp:
# Match prefix of fiware service path
if fiware_sp == '/':
Expand All @@ -832,13 +834,22 @@ def _get_where_clause(self, entity_ids, from_date, to_date, fiware_sp=None,
else:
# Match prefix of fiware service path
clauses.append(" " + prefix + FIWARE_SERVICEPATH + " = ''")
# TODO implement prefix also for geo_clause
geo_clause = self._get_geo_clause(geo_query)
attrs = ''
attrs_clauses = []
if attr_names:
attrs_clauses.append(" and ")
for a in attr_names:
attrs = '\'' + a + '\''
attrs_clauses.append(" " + attrs + " = any(time_index_attribute) or")
attrs_clauses.append(" time_index_attribute = NULL")
# TODO implement prefix also for geo_clause
if geo_clause:
clauses.append(geo_clause)

if len(clauses) > 0:
where_clause = "where" + " and ".join(clauses)
where_clause = "where" + " and ".join(clauses)
where_clause = where_clause + "".join(attrs_clauses)
return where_clause

@staticmethod
Expand Down Expand Up @@ -1078,7 +1089,8 @@ def query(self,
aggr_method,
aggr_period)
if not where_clause:
where_clause = self._get_where_clause(entity_ids,
where_clause = self._get_where_clause(lower_attr_names,
entity_ids,
from_date,
to_date,
fiware_servicepath,
Expand Down
6 changes: 4 additions & 2 deletions src/translators/tests/test_crate.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from translators.sql_translator import METADATA_TABLE_NAME, TYPE_PREFIX
from conftest import crate_translator as translator, entity
from utils.common import TIME_INDEX_NAME
from utils.common import TIME_INDEX_NAME, TIME_INDEX_ATTRIBUTE_NAME
from utils.tests.common import *
from datetime import datetime, timezone


def test_db_version(translator):
version = translator.get_db_version()
major = int(version.split('.')[0])
Expand All @@ -17,6 +16,7 @@ def test_geo_point(translator):
'id': 'Room1',
'type': 'Room',
TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
'location': {
'type': 'geo:point',
'value': "19.6389474, -98.9109537" # lat, long
Expand Down Expand Up @@ -47,6 +47,7 @@ def test_geo_point_null_values(translator):
'id': 'Room1',
'type': 'Room',
TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
'location': {
'type': 'geo:point',
'value': "19.6389474, -98.9109537" # lat, long
Expand All @@ -63,6 +64,7 @@ def test_geo_point_null_values(translator):
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
'temperature': {
'type': 'Number',
'value': 19}}
Expand Down
14 changes: 11 additions & 3 deletions src/translators/tests/test_insert.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ def test_insert_entity(translator, entity):
now = datetime.now(timezone.utc)
now_iso = now.isoformat(timespec='milliseconds')
entity[BaseTranslator.TIME_INDEX_NAME] = now_iso
entity[BaseTranslator.TIME_INDEX_ATTRIBUTE_NAME] = ['temperature','pressure']

result = translator.insert([entity])
assert result.rowcount != 0
Expand All @@ -274,7 +275,7 @@ def test_insert_same_entity_with_different_attrs(
# of temperature.
for entity in sameEntityWithDifferentAttrs:
entity[BaseTranslator.TIME_INDEX_NAME] = entity['temperature']['metadata']['dateModified']['value']

entity[BaseTranslator.TIME_INDEX_ATTRIBUTE_NAME] = ['temperature','pressure']
result = translator.insert(sameEntityWithDifferentAttrs)
assert result.rowcount != 0

Expand Down Expand Up @@ -521,6 +522,7 @@ def test_accept_unknown_ngsi_type(translator):
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
"address": {
"type": "PostalAddress",
"value": {
Expand Down Expand Up @@ -549,6 +551,7 @@ def test_accept_special_chars(translator):
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
"address": {
"type": "Address-Type",
"value": {
Expand All @@ -572,6 +575,7 @@ def test_missing_type_defaults_to_string(translator):
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
"foo": {
"value": "BaR",
},
Expand All @@ -590,7 +594,8 @@ def test_missing_type_defaults_to_string(translator):
def test_capitals(translator):
entity_type = "SoMeWeIrDtYpE"
e1 = {
"type": entity_type, "id": "sOmEwEiRdId", TIME_INDEX_NAME: datetime.now(
"type": entity_type, "id": "sOmEwEiRdId",TIME_INDEX_ATTRIBUTE_NAME:
['temperature','pressure'], TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'), "Foo": {
"type": "Text", "value": "FoO", }, "bAr": {
Expand All @@ -606,7 +611,7 @@ def test_capitals(translator):
e2['NewAttr'] = {"type": "Text", "value": "NewAttrValue!"}
e2[TIME_INDEX_NAME] = datetime.now(
timezone.utc).isoformat(timespec='milliseconds')

e2[TIME_INDEX_ATTRIBUTE_NAME]: ['temperature','pressure']
translator.insert([e2])
entities, err = translator.query()
assert len(entities) == 2
Expand Down Expand Up @@ -646,6 +651,7 @@ def test_long_json(translator):
big_entity = {
'id': 'entityId1',
'type': 'type1',
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
Expand All @@ -666,6 +672,7 @@ def test_structured_value_to_array(translator):
entity = {
'id': '8906',
'type': 'AirQualityObserved',
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
TIME_INDEX_NAME: datetime.now(timezone.utc).isoformat(timespec='milliseconds'),
'aqi': {'type': 'Number', 'value': 43},
'city': {'type': 'Text', 'value': 'Antwerpen'},
Expand Down Expand Up @@ -699,6 +706,7 @@ def test_ISO8601(translator):
e = {
"type": "MyType",
"id": "MyId",
TIME_INDEX_ATTRIBUTE_NAME: ['temperature','pressure'],
TIME_INDEX_NAME: datetime.now(
timezone.utc).isoformat(
timespec='milliseconds'),
Expand Down
2 changes: 1 addition & 1 deletion src/utils/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
TIME_INDEX_NAME = 'time_index'

TIME_INDEX_ATTRIBUTE_NAME = 'time_index_attribute'

def entity_pk(entity):
"""
Expand Down