Skip to content

Commit

Permalink
feat(devices): add API to query for senML import logs
Browse files Browse the repository at this point in the history
  • Loading branch information
coderbyheart committed Apr 15, 2024
1 parent 170a1df commit f7dad8a
Show file tree
Hide file tree
Showing 13 changed files with 390 additions and 92 deletions.
1 change: 1 addition & 0 deletions cdk/BackendLambdas.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type BackendLambdas = {
createCredentials: PackedLambda
openSSL: PackedLambda
senMLToLwM2M: PackedLambda
senMLImportLogs: PackedLambda
}
2 changes: 2 additions & 0 deletions cdk/baseLayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ const dependencies: Array<keyof (typeof pJson)['dependencies']> = [
'@aws-lambda-powertools/metrics',
'@hello.nrfcloud.com/nrfcloud-api-helpers',
'@hello.nrfcloud.com/lambda-helpers',
'id128',
'p-retry',
]

export const pack = async (): Promise<PackedLayer> =>
Expand Down
1 change: 1 addition & 0 deletions cdk/packBackendLambdas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export const packBackendLambdas = async (): Promise<BackendLambdas> => ({
createCredentials: await pack('createCredentials'),
openSSL: await pack('openSSL'),
senMLToLwM2M: await pack('senMLToLwM2M'),
senMLImportLogs: await pack('senMLImportLogs'),
})
39 changes: 38 additions & 1 deletion cdk/resources/SenMLMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ import {
aws_iam as IAM,
aws_iot as IoT,
aws_lambda as Lambda,
aws_logs as Logs,
RemovalPolicy,
Stack,
} from 'aws-cdk-lib'
import { Construct } from 'constructs'
import type { BackendLambdas } from '../BackendLambdas.js'
import type { PublicDevices } from './PublicDevices.js'
import { RetentionDays } from 'aws-cdk-lib/aws-logs'

/**
* Handle incoming SenML messages
*/
export class SenMLMessages extends Construct {
public readonly importLogsFn: Lambda.IFunction
constructor(
parent: Construct,
{
Expand All @@ -24,12 +29,19 @@ export class SenMLMessages extends Construct {
publicDevices,
}: {
baseLayer: Lambda.ILayerVersion
lambdaSources: Pick<BackendLambdas, 'senMLToLwM2M'>
lambdaSources: Pick<BackendLambdas, 'senMLToLwM2M' | 'senMLImportLogs'>
publicDevices: PublicDevices
},
) {
super(parent, 'senml-messages')

const importLogs = new Logs.LogGroup(this, 'importLogs', {
logGroupName: `${Stack.of(this).stackName}/senml-device-message-import`,
retention: RetentionDays.ONE_MONTH,
logGroupClass: Logs.LogGroupClass.INFREQUENT_ACCESS,
removalPolicy: RemovalPolicy.DESTROY,
})

const fn = new Lambda.Function(this, 'fn', {
handler: lambdaSources.senMLToLwM2M.handler,
architecture: Lambda.Architecture.ARM_64,
Expand All @@ -43,6 +55,7 @@ export class SenMLMessages extends Construct {
environment: {
VERSION: this.node.getContext('version'),
PUBLIC_DEVICES_TABLE_NAME: publicDevices.publicDevicesTable.tableName,
IMPORT_LOGGROUP_NAME: importLogs.logGroupName,
},
initialPolicy: [
new IAM.PolicyStatement({
Expand All @@ -53,6 +66,7 @@ export class SenMLMessages extends Construct {
...new LambdaLogGroup(this, 'fnLogs'),
})
publicDevices.publicDevicesTable.grantReadData(fn)
importLogs.grantWrite(fn)

const rule = new IoT.CfnTopicRule(this, 'rule', {
topicRulePayload: {
Expand Down Expand Up @@ -86,5 +100,28 @@ export class SenMLMessages extends Construct {
) as IAM.IPrincipal,
sourceArn: rule.attrArn,
})

this.importLogsFn = new Lambda.Function(this, 'importLogsFn', {
handler: lambdaSources.senMLImportLogs.handler,
architecture: Lambda.Architecture.ARM_64,
runtime: Lambda.Runtime.NODEJS_20_X,
timeout: Duration.minutes(1),
memorySize: 1792,
code: Lambda.Code.fromAsset(lambdaSources.senMLImportLogs.zipFile),
description: 'Returns the last import messages for a device.',
layers: [baseLayer],
environment: {
VERSION: this.node.getContext('version'),
IMPORT_LOGGROUP_NAME: importLogs.logGroupName,
},
...new LambdaLogGroup(this, 'importLogsFnLogs'),
initialPolicy: [
new IAM.PolicyStatement({
actions: ['logs:StartQuery', 'logs:GetQueryResults'],
resources: [importLogs.logGroupArn],
}),
],
})
importLogs.grantRead(this.importLogsFn)
}
}
10 changes: 7 additions & 3 deletions cdk/stacks/BackendStack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,29 @@ export class BackendStack extends Stack {

const publicDevices = new PublicDevices(this)

const api = new API(this)

new LwM2MShadow(this, {
baseLayer,
lambdaSources,
publicDevices,
})

new SenMLMessages(this, {
const senMLMessages = new SenMLMessages(this, {
baseLayer,
lambdaSources,
publicDevices,
})
api.addRoute(
'GET /device/{id}/senml-import-logs',
senMLMessages.importLogsFn,
)

new ConnectionInformationGeoLocation(this, {
baseLayer,
lambdaSources,
})

const api = new API(this)

const shareAPI = new ShareAPI(this, {
baseLayer,
lambdaSources,
Expand Down
5 changes: 5 additions & 0 deletions cli/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { configureNrfCloudAccount } from './commands/configure-nrfcloud-account.
import { logsCommand } from './commands/logs.js'
import { CloudWatchLogsClient } from '@aws-sdk/client-cloudwatch-logs'
import { configureHello } from './commands/configure-hello.js'
import { shareDevice } from './commands/share-device.js'

const ssm = new SSMClient({})
const db = new DynamoDBClient({})
Expand Down Expand Up @@ -72,6 +73,10 @@ const CLI = async ({ isCI }: { isCI: boolean }) => {
env: accountEnv,
stackName: STACK_NAME,
}),
shareDevice({
db,
publicDevicesTableName: mapOutputs.publicDevicesTableName,
}),
)
} catch (error) {
console.warn(chalk.yellow('⚠️'), chalk.yellow((error as Error).message))
Expand Down
10 changes: 1 addition & 9 deletions cli/commands/register-custom-device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,12 @@ export const registerCustomMapDevice = ({
deviceId,
model,
email,
generateToken: () => '123456',
confirmed: true,
})
if ('error' in maybePublished) {
console.error(maybePublished.error)
throw new Error(`Failed to register custom device.`)
}
const maybeConfirmed = await publicDevice.confirmOwnership({
deviceId,
ownershipConfirmationToken: '123456',
})
if ('error' in maybeConfirmed) {
console.error(maybeConfirmed.error)
throw new Error(`Failed to confirm custom device.`)
}

const certDir = path.join(
process.cwd(),
Expand Down
43 changes: 43 additions & 0 deletions cli/commands/share-device.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import type { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { models } from '@hello.nrfcloud.com/proto-map'
import chalk from 'chalk'
import { publicDevicesRepo } from '../../sharing/publicDevicesRepo.js'
import type { CommandDefinition } from './CommandDefinition.js'

const modelIDs = Object.keys(models)

export const shareDevice = ({
db,
publicDevicesTableName,
}: {
db: DynamoDBClient
publicDevicesTableName: string
}): CommandDefinition => ({
command: `share-device <deviceId> <model> <email>`,
action: async (deviceId, model, email) => {
console.log(publicDevicesTableName)
if (!modelIDs.includes(model))
throw new Error(
`Unknown model ${model}. Known models are ${modelIDs.join(', ')}.`,
)
if (!/.+@.+/.test(email)) {
throw new Error(`Must provide valid email.`)
}
console.debug(chalk.yellow('Device ID:'), chalk.blue(deviceId))
const publicDevice = publicDevicesRepo({
db,
TableName: publicDevicesTableName,
})
const maybePublished = await publicDevice.share({
deviceId,
model,
email,
confirmed: true,
})
if ('error' in maybePublished) {
console.error(maybePublished.error)
throw new Error(`Failed to share device.`)
}
},
help: 'Shares an existing device to be shown on the map',
})
121 changes: 121 additions & 0 deletions lambda/senMLImportLogs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import {
CloudWatchLogsClient,
GetQueryResultsCommand,
QueryStatus,
StartQueryCommand,
type ResultField,
} from '@aws-sdk/client-cloudwatch-logs'
import { aProblem } from '@hello.nrfcloud.com/lambda-helpers/aProblem'
import { aResponse } from '@hello.nrfcloud.com/lambda-helpers/aResponse'
import { addVersionHeader } from '@hello.nrfcloud.com/lambda-helpers/addVersionHeader'
import { corsOPTIONS } from '@hello.nrfcloud.com/lambda-helpers/corsOPTIONS'
import {
formatTypeBoxErrors,
validateWithTypeBox,
} from '@hello.nrfcloud.com/proto'
import { Context, DeviceId } from '@hello.nrfcloud.com/proto-map/api'
import middy from '@middy/core'
import { fromEnv } from '@nordicsemiconductor/from-env'
import { Type } from '@sinclair/typebox'
import type {
APIGatewayProxyEventV2,
APIGatewayProxyResultV2,
} from 'aws-lambda'
import pRetry from 'p-retry'

const { importLogGroupName, version } = fromEnv({
importLogGroupName: 'IMPORT_LOGGROUP_NAME',
version: 'VERSION',
})(process.env)

const logs = new CloudWatchLogsClient({})

const validateInput = validateWithTypeBox(
Type.Object({
id: DeviceId,
}),
)

const h = async (
event: APIGatewayProxyEventV2,
): Promise<APIGatewayProxyResultV2> => {
const maybeValidQuery = validateInput(event.pathParameters)

if ('errors' in maybeValidQuery) {
return aProblem({
title: 'Validation failed',
status: 400,
detail: formatTypeBoxErrors(maybeValidQuery.errors),
})
}

const queryString = `filter @logStream LIKE '${maybeValidQuery.value.id}-'
| fields @timestamp, @message
| sort @timestamp desc
| limit 100`
const { queryId } = await logs.send(
new StartQueryCommand({
logGroupName: importLogGroupName,
queryString,
startTime: Date.now() - 24 * 60 * 60 * 1000,
endTime: Date.now(),
}),
)
console.debug({ queryId, queryString })

const results = await pRetry(
async () => {
const result = await logs.send(
new GetQueryResultsCommand({
queryId,
}),
)
switch (result.status) {
case QueryStatus.Cancelled:
return []
case QueryStatus.Complete:
return result.results
case QueryStatus.Failed:
console.error(`Query failed!`)
return []
case QueryStatus.Timeout:
console.error(`Query timed out!`)
return []
case QueryStatus.Running:
case QueryStatus.Scheduled:
throw new Error(`Running!`)
case QueryStatus.Unknown:
default:
console.debug('Unknown query status.')
return []
}
},
{
factor: 1,
minTimeout: 1000,
retries: 10,
},
)

return aResponse(
200,
{
'@context': Context.named('senml-import-logs'),
results: (results ?? []).map((fields) => {
const result = JSON.parse((fields[1] as ResultField).value as string)
return {
...result,
ts: new Date(
(fields[0] as ResultField).value as string,
).toISOString(),
}
}),
},
60,
)
}

export const handler = middy()
.use(addVersionHeader(version))
.use(corsOPTIONS('GET'))
.handler(h)
Loading

0 comments on commit f7dad8a

Please sign in to comment.