Skip to content

Commit

Permalink
Merge branch 'add-quality'
Browse files Browse the repository at this point in the history
  • Loading branch information
einarsi committed Jan 27, 2021
2 parents 16907d0 + 6823784 commit 69fa499
Show file tree
Hide file tree
Showing 9 changed files with 503 additions and 68 deletions.
2 changes: 1 addition & 1 deletion extratests
40 changes: 29 additions & 11 deletions tagreader/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def get_handler(
if "AspenTech SQLplus" not in pyodbc.drivers():
raise RuntimeError(
"No Aspen SQLplus ODBC driver detected. Either switch to Web API "
"('aspenweb') or install appropriate driver."
"('aspenone') or install appropriate driver."
)
if host is None:
hostport = get_server_address_aspen(datasource)
Expand Down Expand Up @@ -276,19 +276,24 @@ def search(self, tag=None, desc=None):
def _get_metadata(self, tag):
return self.handler._get_tag_metadata(tag)

def _read_single_tag(self, tag, start_time, stop_time, ts, read_type, cache=None):
def _read_single_tag(
self, tag, start_time, stop_time, ts, read_type, get_status, cache=None
):
if read_type == ReaderType.SNAPSHOT:
metadata = self._get_metadata(tag)
df = self.handler.read_tag(
tag, start_time, stop_time, ts, read_type, metadata
tag, start_time, stop_time, ts, read_type, metadata, get_status
)
else:
stepped = False
status = False
missing_intervals = [(start_time, stop_time)]
df = pd.DataFrame()

if isinstance(cache, SmartCache) and read_type != ReaderType.RAW:
if (
isinstance(cache, SmartCache)
and read_type != ReaderType.RAW
and not get_status
):
time_slice = get_next_timeslice(start_time, stop_time, ts)
df = cache.fetch(
tag,
Expand All @@ -308,7 +313,7 @@ def _read_single_tag(self, tag, start_time, stop_time, ts, read_type, cache=None
readtype=read_type,
ts=ts,
stepped=stepped,
status=status,
status=get_status,
starttime=start_time,
endtime=stop_time,
)
Expand All @@ -317,7 +322,7 @@ def _read_single_tag(self, tag, start_time, stop_time, ts, read_type, cache=None
readtype=read_type,
ts=ts,
stepped=stepped,
status=status,
status=get_status,
starttime=start_time,
endtime=stop_time,
)
Expand All @@ -329,13 +334,13 @@ def _read_single_tag(self, tag, start_time, stop_time, ts, read_type, cache=None
for (start, stop) in missing_intervals:
while True:
df = self.handler.read_tag(
tag, start, stop, ts, read_type, metadata
tag, start, stop, ts, read_type, metadata, get_status
)
if len(df.index) > 0:
if cache is not None and read_type not in [
ReaderType.SNAPSHOT,
ReaderType.RAW,
]:
] and not get_status:
cache.store(df, read_type, ts)
frames.append(df)
if len(df) < self.handler._max_rows:
Expand Down Expand Up @@ -411,10 +416,17 @@ def read_tags(self, tags, start_time, stop_time, ts, read_type=ReaderType.INT):
end_time=stop_time,
ts=ts,
read_type=read_type,
get_status=False,
)

def read(
self, tags, start_time=None, end_time=None, ts=60, read_type=ReaderType.INT
self,
tags,
start_time=None,
end_time=None,
ts=60,
read_type=ReaderType.INT,
get_status=False,
):
"""Reads values for the specified [tags] from the IMS server for the
time interval from [start_time] to [stop_time] in intervals [ts].
Expand Down Expand Up @@ -449,7 +461,13 @@ def read(
for tag in tags:
cols.append(
self._read_single_tag(
tag, start_time, end_time, ts, read_type, cache=self.cache
tag,
start_time,
end_time,
ts,
read_type,
get_status,
cache=self.cache,
)
)
return pd.concat(cols, axis=1)
Expand Down
92 changes: 72 additions & 20 deletions tagreader/odbc_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def generate_connection_string(self):
return self._connection_string

@staticmethod
def generate_read_query(tag, mapdef, start_time, stop_time, sample_time, read_type):
def generate_read_query(
tag, mapdef, start_time, stop_time, sample_time, read_type, get_status=False
):
if mapdef is None:
mapdef = {}
if read_type in [
Expand All @@ -86,20 +88,26 @@ def generate_read_query(tag, mapdef, start_time, stop_time, sample_time, read_ty
ReaderType.SUM,
ReaderType.SHAPEPRESERVING,
]:
raise (NotImplementedError)
raise NotImplementedError

# TODO: How to interpret ip_input_quality and ip_value_quality
# which use a different numeric schema (e.g. -1) than status from
# history table (0, 1, 2, 4, 5, 6)
if get_status and read_type == ReaderType.SNAPSHOT:
raise NotImplementedError

if read_type == ReaderType.SNAPSHOT and stop_time is not None:
raise NotImplementedError(
"Timestamp not supported for IP.21 ODBC connection using 'SNAPSHOT'. "
"Try 'piwebapi' instead."
"Try the web API 'aspenone' instead."
)

seconds = 0
if read_type != ReaderType.SNAPSHOT:
start_time = start_time.tz_convert("UTC")
stop_time = stop_time.tz_convert("UTC")
seconds = int(sample_time.total_seconds())
if ReaderType.SAMPLED == read_type:
if read_type == ReaderType.SAMPLED:
seconds = 0
else:
if seconds <= 0:
Expand Down Expand Up @@ -150,10 +158,15 @@ def generate_read_query(tag, mapdef, start_time, stop_time, sample_time, read_ty
ReaderType.SNAPSHOT: mapdef.get("MAP_CurrentValue", "IP_INPUT_VALUE"),
}.get(read_type, "value")

query = [
f'SELECT ISO8601({ts}) AS "time",',
f'{value} AS "value" FROM {from_column}',
]
status = {
ReaderType.SNAPSHOT: mapdef.get("MAP_CurrentQuality", "IP_INPUT_QUALITY"),
}.get(read_type, "status")

query = [f'SELECT ISO8601({ts}) AS "time", {value} AS "value"']
if get_status:
# status is returned/interpreted as char regardless if cast to INT
query.extend([f', {status} AS "status"'])
query.extend([f"FROM {from_column}"])

if ReaderType.SNAPSHOT != read_type:
start = start_time.strftime(timecast_format_query)
Expand Down Expand Up @@ -319,13 +332,24 @@ def _get_tag_description(self, tag):
res.description = ""
return res.description

def read_tag(self, tag, start_time, stop_time, sample_time, read_type, metadata):
def read_tag(
self,
tag,
start_time,
stop_time,
sample_time,
read_type,
metadata=None,
get_status=False,
):
# if get_status:
# raise NotImplementedError
(cleantag, mapping) = tag.split(";") if ";" in tag else (tag, None)
mapdef = dict()
if mapping is not None:
mapdef = self._get_specific_mapdef(cleantag, mapping)
query = self.generate_read_query(
cleantag, mapdef, start_time, stop_time, sample_time, read_type
cleantag, mapdef, start_time, stop_time, sample_time, read_type, get_status
)
# logging.debug(f'Executing SQL query {query!r}')
df = pd.read_sql(
Expand All @@ -337,9 +361,10 @@ def read_tag(self, tag, start_time, stop_time, sample_time, read_type, metadata)
# This warning will trigger also for (at least some) valid tags with no data.
# if len(df.index) == 0:
# warnings.warn(f"Tag {tag} not found")
if get_status:
df["status"] = df["status"].astype(int)
df = df.tz_localize("UTC")

return df.rename(columns={"value": tag})
return df.rename(columns={"value": tag, "status": tag + "::status"})

def query_sql(
self, query: str, parse: bool = True
Expand Down Expand Up @@ -394,7 +419,14 @@ def generate_search_query(tag=None, desc=None):
return " ".join(query)

def generate_read_query(
self, tag, start_time, stop_time, sample_time, read_type, metadata=None
self,
tag,
start_time,
stop_time,
sample_time,
read_type,
metadata=None,
get_status=False,
):
if read_type in [
ReaderType.COUNT,
Expand All @@ -409,7 +441,7 @@ def generate_read_query(
if read_type == ReaderType.SNAPSHOT and stop_time is not None:
raise NotImplementedError(
"Timestamp not supported for PI ODBC connection using 'SNAPSHOT'."
"Try 'piwebapi' instead."
"Try the web API 'piwebapi' instead."
)

seconds = 0
Expand Down Expand Up @@ -456,9 +488,12 @@ def generate_read_query(
query = ["SELECT CAST(value as FLOAT32)"]

# query.extend([f"AS value, FORMAT(time, '{timecast_format_output}') AS timestamp FROM {source} WHERE tag='{tag}'"]) # noqa E501
query.extend(
[f"AS value, time FROM {source} WHERE tag='{tag}'"] # __utctime also works
)
query.extend(["AS value,"])

if get_status:
query.extend(["status, questionable, substituted,"])

query.extend([f"time FROM {source} WHERE tag='{tag}'"]) # __utctime also works

if ReaderType.SNAPSHOT != read_type:
start = start_time.strftime(timecast_format_query)
Expand Down Expand Up @@ -538,15 +573,22 @@ def _is_summary(read_type):
return False

def read_tag(
self, tag, start_time, stop_time, sample_time, read_type, metadata=None
self,
tag,
start_time,
stop_time,
sample_time,
read_type,
metadata=None,
get_status=False,
):
if metadata is None:
# Tag not found
# TODO: Handle better and similarly across all handlers.
return pd.DataFrame()

query = self.generate_read_query(
tag, start_time, stop_time, sample_time, read_type
tag, start_time, stop_time, sample_time, read_type, get_status=get_status
)
# logging.debug(f'Executing SQL query {query!r}')
df = pd.read_sql(
Expand All @@ -573,7 +615,17 @@ def read_tag(
offset = [x[1] for x in digitalset]
df = df.replace(code, offset)

return df.rename(columns={"value": tag})
if get_status:
df["status"] = (
# questionable and substituted are boolean, but no need to .astype(int)
# status can be positive or negative for bad.
df["questionable"]
+ 2 * (df["status"] != 0)
+ 4 * df["substituted"]
)
df = df.drop(columns=["questionable", "substituted"])

return df.rename(columns={"value": tag, "status": tag + "::status"})

def query_sql(
self, query: str, parse: bool = True
Expand Down
Loading

0 comments on commit 69fa499

Please sign in to comment.