Skip to content

Commit

Permalink
feature: use data type from signalk-schema (#14)
Browse files Browse the repository at this point in the history
Use @signalk/signalk-schema.getUnits to derive data type in InfluxDb:
if the schema has unit for the path and the unit is not a timestamp
force numeric data type in Influxdb.

This will prevent value type related errors where the first value that
is written to Influx has a bogus data type and all subsequent values
just create errors because the type that ended in Influx is off.

Fixes #7.
  • Loading branch information
tkurki authored Mar 1, 2023
1 parent 128be9a commit 27589d0
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 28 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"prettier": "prettier -w src/",
"lint": "eslint -c .eslintrc.js --ext .ts --ext .js --fix src/",
"format": "npm run prettier && npm run lint",
"ci-lint": "eslint -c .eslintrc.js --ext .ts --ext .js src/ && prettier --check src/",
"ci-lint": "eslint -c .eslintrc.js --ext .ts --ext .js src/ && prettier --check src/",
"generate-schema": "./generate-schema >dist/PluginConfig.json",
"build": "tsc && npm run generate-schema",
"prepublishOnly": "npm install && npm run build"
Expand Down Expand Up @@ -47,6 +47,7 @@
"@influxdata/influxdb-client-apis": "^1.29.0",
"@js-joda/core": "^5.3.0",
"@js-joda/timezone": "^2.12.1",
"@signalk/signalk-schema": "^1.6.0",
"s2-geometry": "^1.2.10"
}
}
97 changes: 72 additions & 25 deletions src/influx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import { SKContext } from '@chacal/signalk-ts'
import { HttpError, InfluxDB, Point, QueryApi, WriteApi, WriteOptions } from '@influxdata/influxdb-client'
import { BucketsAPI, OrgsAPI } from '@influxdata/influxdb-client-apis'
import { getUnits } from '@signalk/signalk-schema'

import { Logging, QueryParams } from './plugin'
import { S2 } from 's2-geometry'
Expand Down Expand Up @@ -58,6 +59,17 @@ interface PathValue {

export const influxPath = (path: string) => (path !== '' ? path : '<empty>')

enum JsValueType {
number = 'number',
string = 'string',
boolean = 'boolean',
object = 'object',
}

const VALUETYPECACHE: {
[key: string]: JsValueType
} = {}

export class SKInflux {
private influx: InfluxDB
public org: string
Expand Down Expand Up @@ -95,31 +107,11 @@ export class SKInflux {
}

handleValue(context: SKContext, isSelf: boolean, source: string, pathValue: PathValue) {
const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source)
if (isSelf) {
point.tag(SELF_TAG_NAME, SELF_TAG_VALUE)
}
if (pathValue.path === 'navigation.position') {
point.floatField('lat', pathValue.value.latitude)
point.floatField('lon', pathValue.value.longitude)
point.tag('s2_cell_id', posToS2CellId(pathValue.value))
} else {
switch (typeof pathValue.value) {
case 'number':
point.floatField('value', pathValue.value)
break
case 'string':
point.stringField('value', pathValue.value)
break
case 'boolean':
point.booleanField('value', pathValue.value)
break
case 'object':
point.stringField('value', JSON.stringify(pathValue.value))
}
}
const point = toPoint(context, isSelf, source, pathValue, this.logging.debug)
this.logging.debug(point)
this.writeApi.writePoint(point)
if (point) {
this.writeApi.writePoint(point)
}
}

flush() {
Expand All @@ -133,11 +125,66 @@ export class SKInflux {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
getSelfValues(params: Omit<QueryParams, 'context'>): Promise<Array<any>> {
const query = paramsToQuery(this.bucket, params)
console.log(query)
// console.log(query)
return this.queryApi.collectRows(query)
}
}

const toPoint = (
context: SKContext,
isSelf: boolean,
source: string,
pathValue: PathValue,
debug: (s: string) => void,
) => {
const point = new Point(influxPath(pathValue.path)).tag('context', context).tag('source', source)
if (isSelf) {
point.tag(SELF_TAG_NAME, SELF_TAG_VALUE)
}
if (pathValue.path === 'navigation.position') {
point.floatField('lat', pathValue.value.latitude)
point.floatField('lon', pathValue.value.longitude)
point.tag('s2_cell_id', posToS2CellId(pathValue.value))
} else {
const valueType = typeFor(pathValue)
try {
switch (valueType) {
case JsValueType.number:
point.floatField('value', pathValue.value)
break
case JsValueType.string:
point.stringField('value', pathValue.value)
break
case JsValueType.boolean:
point.booleanField('value', pathValue.value)
break
case JsValueType.object:
point.stringField('value', JSON.stringify(pathValue.value))
}
} catch (e) {
debug(`Error creating point ${pathValue.path}:${pathValue.value} => ${valueType}`)
return undefined
}
}
return point
}

const typeFor = (pathValue: PathValue): JsValueType => {
let r = VALUETYPECACHE[pathValue.path]
if (!r) {
r = VALUETYPECACHE[pathValue.path] = _typeFor(pathValue)
}
return r
}

const _typeFor = (pathValue: PathValue): JsValueType => {
const unit = getUnits(`vessels.self.${pathValue.path}`)
if (unit && unit !== 'RFC 3339 (UTC)') {
return JsValueType.number
}
return typeof pathValue.value as JsValueType
}

const paramsToQuery = (bucket: string, params: PartialBy<QueryParams, 'context'>) => {
const contextTagClause = params.context
? `and r.context == "${params.context}"`
Expand Down
43 changes: 41 additions & 2 deletions src/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ describe('Plugin', () => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars
debug: function (...args: any): void {
// eslint-disable-next-line no-console
console.log(args)
console.log(`debug:${args}`)
},
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unused-vars
error: function (...args: any): void {
// eslint-disable-next-line no-console
console.log(args)
console.error(`debug:${args}`)
},
signalk: new EventEmitter(),
selfId,
Expand Down Expand Up @@ -200,4 +200,43 @@ describe('Plugin', () => {
interval: 50,
})
})

it('Uses data types from schema, initial null value', async () => assertNumericAfterFirstOtherValue(null))
it('Uses data types from schema, , initial string value', async () =>
assertNumericAfterFirstOtherValue('first string value'))

const assertNumericAfterFirstOtherValue = (firstValue: string | null) => {
const NUMERICSCHEMAPATH = 'navigation.headingTrue'
;[firstValue, 3.14, null, 'last string value'].forEach((value) =>
app.signalk.emit('delta', {
context: TESTCONTEXT,
updates: [
{
$source: TESTSOURCE,
timestamp: new Date('2022-08-17T17:01:00Z'),
values: [
{
path: NUMERICSCHEMAPATH,
value,
},
],
},
],
}),
)

const assertData = () =>
plugin.flush().then(() =>
plugin
.getSelfValues({
paths: [NUMERICSCHEMAPATH],
influxIndex: 0,
})
.then((rows) => expect(rows.length).to.equal(1)),
)
return retry(assertData, [null], {
retriesMax: 5,
interval: 50,
})
}
})
1 change: 1 addition & 0 deletions src/signalk__signalk-schema/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
declare module '@signalk/signalk-schema'

0 comments on commit 27589d0

Please sign in to comment.