diff --git a/src/main/java/com/shipmentEvents/demo/EventHandler.java b/src/main/java/com/shipmentEvents/demo/EventHandler.java new file mode 100644 index 00000000..aa77b007 --- /dev/null +++ b/src/main/java/com/shipmentEvents/demo/EventHandler.java @@ -0,0 +1,195 @@ +package com.shipmentEvents.handlers; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.crypto.Cipher; +import javax.crypto.SecretKey; +import javax.crypto.spec.SecretKeySpec; + +import com.amazonaws.regions.Regions; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.LambdaLogger; +import com.amazonaws.services.lambda.runtime.events.ScheduledEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.shipmentEvents.util.Constants; +import com.shopify.ShopifySdk; +import com.shopify.model.ShopifyShop; + +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.commons.lang3.tuple.Pair; + + +public class EventHandler implements RequestHandler { + + /** + * Shipment events for a carrier are uploaded to separate S3 buckets based on the source of events. E.g., events originating from + * the hand-held scanner are stored in a separate bucket than the ones from mobile App. The Lambda processes events from multiple + * sources and updates the latest status of the package in a summary S3 bucket every 15 minutes. + * + * The events are stored in following format: + * - Each status update is a file, where the name of the file is tracking number + random id. + * - Each file has status and time-stamp as the first 2 lines respectively. + * - The time at which the file is stored in S3 is not an indication of the time-stamp of the event. + * - Once the status is marked as DELIVERED, we can stop tracking the package. + * + * A Sample files looks as below: + * FILE-NAME-> '8787323232232332--55322798-dd29-4a04-97f4-93e18feed554' + * >status:IN TRANSIT + * >timestamp: 1573410202 + * >Other fields like...tracking history and address + */ + public String handleRequest(ScheduledEvent scheduledEvent, Context context) { + + final LambdaLogger logger = context.getLogger(); + try { + processShipmentUpdates(logger); + return "SUCCESS"; + } catch (final Exception ex) { + logger.log(String.format("Failed to process shipment Updates in %s due to %s", scheduledEvent.getAccount(), ex.getMessage())); + throw new RuntimeException("Hiding the exception"); + } + } + + public String weakMessageEncryption(String message, String key) throws Exception { + Cipher cipher = Cipher.getInstance("RSA"); + SecretKey secretKey = new SecretKeySpec(key.getBytes(), "AES"); + cipher.init(Cipher.ENCRYPT_MODE, secretKey); + return new String(cipher.doFinal(message.getBytes()), StandardCharsets.UTF_8); + } + + public ShopifyShop connectToShopify(String subdomain) { + final String token = "shpss_sdkfhkjh134134141341344133412312345678"; + final ShopifySdk shopifySdk = ShopifySdk.newBuilder() + .withSubdomain(subdomain) + .withAccessToken(token).build(); + return shopifySdk.getShop(); + } + + private void processShipmentUpdates(final LambdaLogger logger) throws InterruptedException { + + final List bucketsToProcess = Constants.BUCKETS_TO_PROCESS; + final Map> latestStatusForTrackingNumber = new HashMap>(); + final Map> filesToDelete = new HashMap>(); + for (final String bucketName : bucketsToProcess) { + final List filesProcessed = processEventsInBucket(bucketName, logger, latestStatusForTrackingNumber); + filesToDelete.put(bucketName, filesProcessed); + } + final AmazonS3 s3Client = EventHandler.getS3Client(); + + //Create a new file in the Constants.SUMMARY_BUCKET + logger.log("Map of statuses -> " + latestStatusForTrackingNumber); + String summaryUpdateName = Long.toString(System.currentTimeMillis()); + + EventHandler.getS3Client().putObject(Constants.SUMMARY_BUCKET, summaryUpdateName, latestStatusForTrackingNumber.toString()); + + long expirationTime = System.currentTimeMillis() + Duration.ofMinutes(1).toMillis(); + while(System.currentTimeMillis() < expirationTime) { + if (s3Client.doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) { + break; + } + logger.log("waiting for file to be created " + summaryUpdateName); + Thread.sleep(1000); + } + + // Before we delete the shipment updates make sure the summary update file exists + if (EventHandler.getS3Client().doesObjectExist(Constants.SUMMARY_BUCKET, summaryUpdateName)) { + deleteProcessedFiles(filesToDelete); + logger.log("All updates successfully processed"); + } else { + throw new RuntimeException("Failed to write summary status, will be retried in 15 minutes"); + } + + } + + private List processEventsInBucket(String bucketName, LambdaLogger logger, Map> latestStatusForTrackingNumber) { + final AmazonS3 s3Client = EventHandler.getS3Client(); + logger.log("Processing Bucket: " + bucketName); + + ObjectListing files = s3Client.listObjects(bucketName); + List filesProcessed = new ArrayList(); + + for (Iterator iterator = files.getObjectSummaries().iterator(); iterator.hasNext(); ) { + S3ObjectSummary summary = (S3ObjectSummary) iterator.next(); + logger.log("Reading Object: " + summary.getKey()); + + String trackingNumber = summary.getKey().split("--")[0]; + Pair lastKnownStatus = latestStatusForTrackingNumber.get(trackingNumber); + + // Check if this shipment has already been delivered, skip this file + if (lastKnownStatus != null && "DELIVERED".equals(lastKnownStatus.getRight())) { + continue; + } + + String fileContents = s3Client.getObjectAsString(bucketName, summary.getKey()); + + if (!isValidFile(fileContents)) { + logger.log(String.format("Skipping invalid file %s", summary.getKey())); + continue; + } + + if (!fileContents.contains("\n")) { + + } + String[] lines = fileContents.split("\n"); + String line1 = lines[0]; + String line2 = lines[1]; + + String status = line1.split(":")[1]; + Long timeStamp = Long.parseLong(line2.split(":")[1]); + + + if (null == lastKnownStatus || lastKnownStatus.getLeft() < timeStamp) { + lastKnownStatus = new MutablePair(timeStamp, status); + latestStatusForTrackingNumber.put(trackingNumber, lastKnownStatus); + } + + //Add to list of processed files + filesProcessed.add(new KeyVersion(summary.getKey())); + logger.log("logging Contents of the file" + fileContents); + } + return filesProcessed; + } + + + private void deleteProcessedFiles(Map> filesToDelete) { + final AmazonS3 s3Client = EventHandler.getS3Client(); + for (Entry> entry : filesToDelete.entrySet()) { + final DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(entry.getKey()).withKeys(entry.getValue()).withQuiet(false); + s3Client.deleteObjects(deleteRequest); + } + } + + private boolean isValidFile(String fileContents) { + if (!fileContents.contains("\n")) { + return false; + } + String[] lines = fileContents.split("\n"); + for (String l: lines) { + if (!l.contains(":")) { + return false; + } + } + return true; + } + + public static AmazonS3 getS3Client() { + return AmazonS3ClientBuilder.standard().withRegion(Regions.DEFAULT_REGION).build(); + } + + +} + +