Skip to content

Commit

Permalink
Merge branch 'main' into prod-lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
brycegoh committed Nov 25, 2023
2 parents f3a5aaa + 14e84fd commit 4f4c682
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 31 deletions.
72 changes: 53 additions & 19 deletions fire-cloud/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@ const REGION = "ap-southeast-1";
const CLIENT_ID = `FIRE-CLOUD`;
const SENSOR_DATA_TOPIC = "greendot/sensor/data";
const FLAME_PRESENCE_TOPIC = "greendot/status";
// const TIMEZONE_OFFSET = 8;

const aqDuration = null; // duration of air quality raising above threshold
/* e.g. node id 1 has status 0 */
const fireStatuses = {
0: 0,
1: 0,
};

/* e.g. node id 1 has null noFireDuration */
const noFireDurations = {
0: null,
1: null,
};

if (!SUPABASE_URL || !SUPABASE_API_KEY) {
console.error("Missing SUPABASE_URL or SUPABASE_API_KEY environment variable");
Expand Down Expand Up @@ -69,7 +78,6 @@ async function connectAndSubscribe() {
console.log("Connecting to AWS IoT Core...");
await connection.connect();
console.log("Connected to AWS IoT Core");
// const result = await invokeAnalytics(1, 2, 3, 4); //TODO: remove this

// Subscribe to a topic
await connection.subscribe(SENSOR_DATA_TOPIC, mqtt.QoS.AtLeastOnce, async (topic, payload) => {
Expand Down Expand Up @@ -104,34 +112,60 @@ async function connectAndSubscribe() {
await invokeAnalytics(rowId, temperature, flameSensorValue);

// START publish to flame presence topic to tune freq flow
/*
1. get the fire probablity from the database
2. validate and check if the fire probability is above threshold -> timestamp
3. publish to flame presence topic
*/
const { data: fireData, error: fireErr } = await supabase
.from("firecloud")
.select("fire_probability, node_id")
.eq("id", rowId);

// const randFlameStatus = Math.round(Math.random()); //random either 1 or 0 //TODO: remove and replace with real flame value
if (fireErr) {
console.error(fireErr);
return;
}
const nodeId = fireData[0].node_id;
const fireProbability = fireData[0].fire_probability;

// publishMessage(FLAME_PRESENCE_TOPIC, {
// status: randFlameStatus,
// });
await validateAndPublishFireMessage(nodeId, fireProbability);
});
}

//TODO: subscribe to supabase table for updates on the flag and timestamp -> validate and publish to MQTT topic
async function subscribeSupabase() {
const { data, error } = await supabase
.from("firecloud")
.on("*", (payload) => {
console.log("Change received!", payload);
})
.subscribe();
if (error) {
console.error(error);
return;
// fire status 0 = no fire, 1 = fire
async function validateAndPublishFireMessage(nodeId, fireProbability) {
const fireProbabilityThreshold = 0.3;

if (fireProbability > fireProbabilityThreshold) {
if (fireStatuses[nodeId] === 0) await publishMessage(FLAME_PRESENCE_TOPIC, { status: 1 });
fireStatuses[nodeId] = 1;
}

//if fire is dying down
if (fireStatuses[nodeId] === 1 && fireProbability < fireProbabilityThreshold) {
if (noFireDurations[nodeId] === null) {
noFireDurations[nodeId] = new Date().getTime();
} else {
const currentTime = new Date().getTime();
const timeDiff = currentTime - noFireDurations[nodeId];
const timeDiffInMinutes = timeDiff / 60000;

// if status has been 0 for more than 5 minutes -> there is no more fire -> publish status 0
if (timeDiffInMinutes > 5) {
await publishMessage(FLAME_PRESENCE_TOPIC, { status: 0 });
fireStatuses[nodeId] = 0;
noFireDurations[nodeId] = null;
}
}
}
console.log("Subscribed to supabase");
}

async function publishMessage(topic, message) {
const messageJson = JSON.stringify(message);
const messageBuffer = Buffer.from(messageJson);
try {
console.log(`Publishing message to ${topic}:`, messageJson);
await connection.publish(topic, messageBuffer, mqtt.QoS.AtLeastOnce);
} catch (e) {
console.error("Error publishing message:", e);
Expand Down
16 changes: 4 additions & 12 deletions lambda/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,14 @@
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")

supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
try:
redisClient = redis.from_url(REDIS_ENDPOINT, decode_responses=True)
except Exception as e:
print(f"Error connecting to redis: {e}")

PAST_RECORDS_DURATION = 120 # in minutes

def lambda_handler(event, context):

try:
redisClient = redis.Redis(host=REDIS_ENDPOINT, port=6379, decode_responses=True)
except Exception as e:
return {
'statusCode': 500,
'body': json.dumps({
'error': 'Error connecting to redis',
'error_message': str(e)
})
}

rowId = event.get('rowId')
temp = event.get('temp')
flame = event.get('flame')
Expand Down

0 comments on commit 4f4c682

Please sign in to comment.