diff --git a/fire-cloud/index.js b/fire-cloud/index.js index cbdb981..da49b2e 100644 --- a/fire-cloud/index.js +++ b/fire-cloud/index.js @@ -86,6 +86,7 @@ async function connectAndSubscribe() { const messageJson = JSON.parse(messageString); console.log(`Message received on ${topic}:`, messageJson); + const nodeId = messageJson.id; const temperature = messageJson.temp; const humidity = messageJson.humidity; const airQualityPpm = messageJson.air; @@ -109,9 +110,8 @@ async function connectAndSubscribe() { //invoke lambda function to calculate fire probability and update database const rowId = data[0].id; - const lambdaRes = await invokeAnalytics(rowId, temperature, flameSensorValue); + const lambdaRes = await invokeAnalytics(nodeId, rowId, temperature, flameSensorValue); - const nodeId = messageJson.id; const fireProbability = lambdaRes.fire_probability; if (fireProbability === null) { @@ -176,11 +176,12 @@ async function publishMessage(topic, message) { } // Function to invoke the Analytics lambda function to calculate the fire probability and update the database -async function invokeAnalytics(rowId, temp, flameValue) { +async function invokeAnalytics(nodeId, rowId, temp, flameValue) { console.log("Invoking lambda function..."); const command = new InvokeCommand({ FunctionName: "greendot-analytics", Payload: JSON.stringify({ + nodeId: nodeId, rowId: rowId, temp: temp, flame: flameValue, diff --git a/lambda/lambda_function.py b/lambda/lambda_function.py index e456aea..eb8b34e 100644 --- a/lambda/lambda_function.py +++ b/lambda/lambda_function.py @@ -11,6 +11,7 @@ PAST_RECORDS_DURATION = 3 # in minutes TODO: change such that we can sample >= 25 past records (e.g. sampling interval * 25) def lambda_handler(event, context): + nodeId = event.get('nodeId') rowId = event.get('rowId') temp = event.get('temp') flame = event.get('flame') @@ -18,7 +19,7 @@ def lambda_handler(event, context): # get past records from supabase temp_hum_aq_res = None try: - temp_hum_aq_res = supabase.rpc("get_past_records", {"interval_string": f"{PAST_RECORDS_DURATION} minutes", }).execute() + temp_hum_aq_res = supabase.rpc("get_past_records", {"interval_string": f"{PAST_RECORDS_DURATION} minutes", "node_id": nodeId}).execute() except Exception as e: print(f"Error getting past records supabase: {e}")