Skip to content

Commit

Permalink
Merge pull request #9 from amazon-connect/release-5.9
Browse files Browse the repository at this point in the history
5.9 release
  • Loading branch information
yingdoli authored Jan 6, 2021
2 parents 3a92672 + 824a7b6 commit bb06c0e
Show file tree
Hide file tree
Showing 14 changed files with 524 additions and 174 deletions.
66 changes: 37 additions & 29 deletions sam-app/lambda_functions/salesforce.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,40 +68,25 @@ def __load_credentials(self):
self.consumer_key = self.secrets["ConsumerKey"]
self.consumer_secret = self.secrets["ConsumerSecret"]
self.auth_token = self.secrets["AuthToken"] if "AuthToken" in self.secrets else ''
self.auth_token_expiration = self.secrets["AuthTokenExpiration"] or 0 if "AuthTokenExpiration" in self.secrets else 0
logger.info("Credentials Loaded")

def set_production(self):
self.login_host = 'https://login.salesforce.com'

def sign_in(self):
logger.info("Salesforce: Sign in")
time = datetime.datetime.utcnow()

if not self.auth_token or time >= datetime.datetime.utcfromtimestamp(self.auth_token_expiration): # stored access token no longer valid
logger.info("Retrieving new Salesforce OAuth token")
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
resp = self.request.post(url=self.login_host+"/services/oauth2/token", params=self.auth_data, headers=headers, hideData=True)
data = resp.json()
self.auth_token = self.secrets["AuthToken"] = data['access_token']
self.secrets["AuthTokenExpiration"] = (time + datetime.timedelta(minutes=13)).timestamp() # sf access token expires at most every 15 minutes; give ourselves a small buffer
self.secrets_manager_client.put_secret_value(SecretId=self.sf_credentials_secrets_manager_arn, SecretString=json.dumps(self.secrets))

self.headers = {
'Authorization': 'Bearer %s' % self.auth_token,
'Content-Type': 'application/json'
}
logger.info("Credentials Loaded")

def set_production(self):
self.login_host = 'https://login.salesforce.com'

def search(self, query):
logger.info("Salesforce: Search")
url = '%s/services/data/%s/search' % (self.host, self.version)
resp = self.request.get(url=url, params={'q':query}, headers=self.headers)
resp = self.makeRequest(self.request.get, **{"url": url, "params":{'q':query}})
return resp.json()['searchRecords']

def query(self, query):#TODO: create generator that takes care of subsequent request for more than 200 records
logger.info("Salesforce: Query")
url = '%s/services/data/%s/query' % (self.host, self.version)
resp = self.request.get(url=url, params={'q':query}, headers=self.headers)
resp = self.makeRequest(self.request.get, **{"url": url, "params":{'q':query}})
data = resp.json()
for record in data['records']:
del record['attributes']
Expand All @@ -110,7 +95,7 @@ def query(self, query):#TODO: create generator that takes care of subsequent req
def parameterizedSearch(self, data):#TODO: create generator that takes care of subsequent request for more than 200 records
logger.info("Salesforce: Query")
url = '%s/services/data/%s/parameterizedSearch' % (self.host, self.version)
resp = self.request.post(url=url, data=data, headers=self.headers)
resp = self.makeRequest(self.request.post, **{"url": url, "data": data})
data = resp.json()

for record in data['searchRecords']:
Expand All @@ -120,24 +105,24 @@ def parameterizedSearch(self, data):#TODO: create generator that takes care of s
def update(self, sobject, sobj_id, data):
logger.info("Salesforce: Update")
url = '%s/services/data/%s/sobjects/%s/%s' % (self.host, self.version, sobject, sobj_id)
resp = self.request.patch(url=url, data=data, headers=self.headers)
resp = self.makeRequest(self.request.patch, **{"url": url, "data": data})
return resp.status_code

def update_by_external(self, sobject, field, sobj_id, data):
logger.info("Salesforce: Update by external")
url = '%s/services/data/%s/sobjects/%s/%s/%s' % (self.host, self.version, sobject, field, sobj_id)
self.request.patch(url=url, data=data, headers=self.headers)
self.makeRequest(self.request.patch, **{"url": url, "data": data})

def create(self, sobject, data):
logger.info("Salesforce: Create")
url = '%s/services/data/%s/sobjects/%s' % (self.host, self.version, sobject)
resp = self.request.post(url=url, data=data, headers=self.headers)
resp = self.makeRequest(self.request.post, **{"url": url, "data": data})
return resp.json()['id']

def delete(self, sobject, sobject_id):
logger.info("Salesforce: Delete")
url = '%s/services/data/%s/sobjects/%s/%s' % (self.host, self.version, sobject, sobject_id)
resp = self.request.delete(url=url, headers=self.headers)
resp = self.makeRequest(self.request.delete, **{"url": url})

def is_authenticated(self):
return self.auth_token and self.host
Expand Down Expand Up @@ -175,7 +160,7 @@ def createChatterPost(self, data):
'feedElementType' : data['sf_feedElementType'],
'subjectId' : data['sf_subjectId']
}
resp = self.request.post(url=url, data=data, headers=self.headers)
resp = self.makeRequest(self.request.post, **{"url": url, "data": data})
return resp.json()['id']

def createChatterComment(self, sfeedElementId, data):
Expand All @@ -190,8 +175,25 @@ def createChatterComment(self, sfeedElementId, data):
}]
}
}
resp = self.request.post(url=url, data=data, headers=self.headers)
resp = self.makeRequest(self.request.post, **{"url": url, "data": data})
return resp.json()['id']

def makeRequest(self, requestMethod, **kwargs):
try:
return requestMethod(**kwargs, headers=self.headers)
except InvalidAuthTokenException as e:
# try re-fetching auth token
logger.info("Retrieving new Salesforce OAuth token")
headers = { 'Content-Type': 'application/x-www-form-urlencoded' }
resp = self.request.post(url=self.login_host+"/services/oauth2/token", params=self.auth_data, headers=headers, hideData=True)
data = resp.json()
self.auth_token = self.secrets["AuthToken"] = data['access_token']
self.headers = kwargs['headers'] = {
'Authorization': 'Bearer %s' % self.auth_token,
'Content-Type': 'application/json'
}
self.secrets_manager_client.put_secret_value(SecretId=self.sf_credentials_secrets_manager_arn, SecretString=json.dumps(self.secrets))
return requestMethod(**kwargs)

class Request:
def post(self, url, headers, data=None, params=None, hideData=False):
Expand Down Expand Up @@ -225,6 +227,9 @@ def __check_resp__(resp):
if resp.status_code // 100 == 2:
return resp

if resp.status_code == 401:
raise InvalidAuthTokenException("")

data = resp.json()
if 'error' in data:
msg = "%s: %s" % (data['error'], data['error_description'])
Expand All @@ -240,4 +245,7 @@ def __check_resp__(resp):

msg = "request returned status code: %d" % resp.status_code
logger.error(msg)
raise Exception(msg)
raise Exception(msg)

class InvalidAuthTokenException(Exception):
pass
2 changes: 1 addition & 1 deletion sam-app/lambda_functions/sfCTRTrigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def send_data(data_to_send):
event_to_send = event_template.copy()
event_to_send['record'] = data_to_send

if os.environ["POSTCALL_RECORDING_IMPORT_ENABLED"] == 'true' or os.environ["POSTCALL_TRANSCRIBE_ENABLED"] == 'true' or os.environ["CONTACT_LENS_IMPORT_ENABLED"] == 'true':
if os.environ["POSTCALL_RECORDING_IMPORT_ENABLED"] == 'true' or os.environ["POSTCALL_TRANSCRIBE_ENABLED"] == 'true':
logger.info('Invoke EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA')
boto3.client('lambda').invoke(FunctionName=os.environ["EXECUTE_TRANSCRIPTION_STATE_MACHINE_LAMBDA"], InvocationType='Event', Payload=json.dumps(event_to_send))

Expand Down
158 changes: 158 additions & 0 deletions sam-app/lambda_functions/sfContactLensUtil.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""
You must have an AWS account to use the Amazon Connect CTI Adapter.
Downloading and/or using the Amazon Connect CTI Adapter is subject to the terms of the AWS Customer Agreement,
AWS Service Terms, and AWS Privacy Notice.
© 2017, Amazon Web Services, Inc. or its affiliates. All rights reserved.
NOTE: Other license terms may apply to certain, identified software components
contained within or distributed with the Amazon Connect CTI Adapter if such terms are
included in the LibPhoneNumber-js and Salesforce Open CTI. For such identified components,
such other license terms will then apply in lieu of the terms above.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging, os, json, boto3
logger = logging.getLogger()
logger.setLevel(logging.getLevelName(os.environ["LOGGING_LEVEL"]))

def getDataSource():
return 'Contact_Lens'

def processContactLensTranscript(iItems, participants):
customerTranscripts = []
agentTranscripts = []
finalTranscripts = []

for iTranscript in iItems:
transcript = {}
transcript['id'] = iTranscript['Id']
transcript['participantId'] = iTranscript['ParticipantId'] # For now it's either AGENT or CUSTOMER
transcript['beginOffsetMillis'] = iTranscript['BeginOffsetMillis']
transcript['endOffsetMillis'] = iTranscript['EndOffsetMillis']
transcript['content'] = iTranscript['Content']
transcript['sentiment'] = iTranscript['Sentiment']
transcript['loudness_score'] = iTranscript['LoudnessScore'] # array
if 'IssuesDetected' in iTranscript:
transcript['issues_detected'] = iTranscript['IssuesDetected']
if 'Redaction' in iTranscript:
transcript['redaction'] = iTranscript['Redaction']

finalTranscripts.append(transcript)
if iTranscript['ParticipantId'] == 'AGENT':
transcript['participantRole'] = getParticipantRole('AGENT', participants)
agentTranscripts.append(transcript)
elif iTranscript['ParticipantId'] == 'CUSTOMER':
transcript['participantRole'] = getParticipantRole('CUSTOMER', participants)
customerTranscripts.append(transcript)


return {'customerTranscripts' : customerTranscripts, 'agentTranscripts' : agentTranscripts, 'finalTranscripts': finalTranscripts}


def processContactLensConversationCharacteristics(contactLensObj, connectBucket, transcripts):
resultSet = {}

# Overall Sentiment
resultSet['contactLensAgentOverallSentiment'] = contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment']['AGENT'] if 'AGENT' in contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment'] else None
resultSet['contactLensCustomerOverallSentiment'] = contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment']['CUSTOMER'] if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Sentiment']['OverallSentiment'] else None

# Sentiment By Period
if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Sentiment']['SentimentByPeriod']['QUARTER']:
customerSentimentCurve = []
customerSentimentCurveLabel = 'Other'
for quarter in contactLensObj['ConversationCharacteristics']['Sentiment']['SentimentByPeriod']['QUARTER']['CUSTOMER']:
customerSentimentCurve.append(quarter['Score'])
customerSentimentCurve[1:3] = [sum(customerSentimentCurve[1:3]) / 2]
if (customerSentimentCurve[0] <= customerSentimentCurve[1] - 1) & (customerSentimentCurve[1] < customerSentimentCurve[2]):
customerSentimentCurveLabel = 'S'
elif (customerSentimentCurve[0] >= customerSentimentCurve[1] + 1) & (customerSentimentCurve[1] > customerSentimentCurve[2]):
customerSentimentCurveLabel = 'Z'
resultSet['contactLensCustomerSentimentCurve'] = customerSentimentCurveLabel
else:
resultSet['contactLensCustomerSentimentCurve'] = None

# Interruptions Total Count
resultSet['contactLensInterruptions'] = contactLensObj['ConversationCharacteristics']['Interruptions']['TotalCount']
resultSet['contactLensAgentInterruptions'] = json.dumps(contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter']['AGENT']) if 'AGENT' in contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter'] else None
resultSet['contactLensCustomerInterruptions'] = json.dumps(contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter']['CUSTOMER']) if 'CUSTOMER' in contactLensObj['ConversationCharacteristics']['Interruptions']['InterruptionsByInterrupter'] else None


# Non Talk Time
resultSet['contactLensNonTalkTime'] = contactLensObj['ConversationCharacteristics']['NonTalkTime']['TotalTimeMillis']

# Talk Speed
resultSet['contactLensTalkSpeedCustomer'] = contactLensObj['ConversationCharacteristics']['TalkSpeed']['DetailsByParticipant']['CUSTOMER']['AverageWordsPerMinute']
resultSet['contactLensTalkSpeedAgent'] = contactLensObj['ConversationCharacteristics']['TalkSpeed']['DetailsByParticipant']['AGENT']['AverageWordsPerMinute']

# Talk time
resultSet['contactLensTalkTimeTotal'] = contactLensObj['ConversationCharacteristics']['TalkTime']['TotalTimeMillis']
resultSet['contactLensTalkTimeCustomer'] = contactLensObj['ConversationCharacteristics']['TalkTime']['DetailsByParticipant']['CUSTOMER']['TotalTimeMillis']
resultSet['contactLensTalkTimeAgent'] = contactLensObj['ConversationCharacteristics']['TalkTime']['DetailsByParticipant']['AGENT']['TotalTimeMillis']

# Categories
resultSet['contactLensMatchedCategories'] = '|'.join(contactLensObj['Categories']['MatchedCategories']) if len(contactLensObj['Categories']['MatchedCategories']) > 0 else None
resultSet['contactLensMatchedDetails'] = json.dumps(contactLensObj['Categories']['MatchedDetails'])

# Recording Path
contactAttributes = getContactAttributes(contactLensObj)
contactId = contactLensObj['CustomerMetadata']['ContactId']
if ('postcallRedactedRecordingImportEnabled' in contactAttributes and contactAttributes['postcallRedactedRecordingImportEnabled'] == 'true'):
logger.info('Redacted recording import is enabled')
redactedRecordingLocation = getRedactedRecordingLocation(ContactId, connectBucket)
resultSet['recordingPath'] = redactedRecordingLocation
else:
resultSet['recordingPath'] = None

# Transcript Full Text
transcriptsText = []
if len(transcripts) > 0:
for transcript in transcripts:
transcriptsText.append(transcript["content"])
resultSet['contactLensTranscriptsFullText'] = ' '.join(transcriptsText)

return resultSet

def getParticipantRole(participantId, participants):
for participant in participants:
if participant['ParticipantId'] == participantId:
return participant['ParticipantRole']
logger.warning('Participant Role not found for participant id: %s' % participantId)
return ''

def getRedactedRecordingLocation(contactId, connectBucket):
logger.info('Retrieving Redacted Recording S3 Location, contact ID is: %s', contactId)
redactedRecordingKey = contactId + '_call_recording_redacted_'

# Using paginator because S3 only returns up to 1000 objects from list_objects_v2() method
client = boto3.client('s3')
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=connectBucket, Prefix='Analysis/Voice/Redacted')
for page in pages:
for obj in page['Contents']:
if redactedRecordingKey in obj['Key'] and obj['Key'].endswith('.wav'):
redactedRecordingLocation = connectBucket + '/' + obj['Key']
return redactedRecordingLocation
logger.warn('Redacted Recording Not Found!')
return ''

def getContactAttributes(contactLensObj):
client = boto3.client('connect')
try:
connect_response = client.get_contact_attributes(
InstanceId=contactLensObj['CustomerMetadata']['InstanceId'],
InitialContactId=contactLensObj['CustomerMetadata']['ContactId']
)
return connect_response["Attributes"]
except Exception as e:
logger.error('Error when retrieving contact attribute: {}'.format(e))
1 change: 0 additions & 1 deletion sam-app/lambda_functions/sfContactTraceRecord.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ def create_ctr_record(ctr):
logger.info(f'Record : {sf_request}')

sf = Salesforce()
sf.sign_in()
sf.update_by_external(objectnamespace + "AC_ContactTraceRecord__c", objectnamespace + 'ContactId__c', ctr['ContactId'], sf_request)

logger.info(f'Record Created Successfully')
11 changes: 9 additions & 2 deletions sam-app/lambda_functions/sfExecuteTranscriptionStateMachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def process_record(record):
if('postcallRecordingImportEnabled' in recordObj["Attributes"] and recordObj["Attributes"]["postcallRecordingImportEnabled"]=='true'):
logger.info('postcallRecordingImportEnabled = true')
createACContactChannelAnalyticsSalesforceObject(recordObj['ContactId'], recordObj['Recording']['Location'])

#check if postcallRedactedRecordingImportEnabled then create AC_ContactChannelAnalytics__c only with ContactId
elif('postcallRedactedRecordingImportEnabled' in recordObj["Attributes"] and recordObj["Attributes"]["postcallRedactedImportEnabled"]=="true"):
logger.info('postcallRedactedRecordingImportEnabled = true')
createACContactChannelAnalyticsSalesforceObject(recordObj['ContactId'])

#check if postcallTranscribeEnabled then start the transcribing process
if('postcallTranscribeEnabled' in recordObj["Attributes"] and recordObj["Attributes"]["postcallTranscribeEnabled"]=='true' and "postcallTranscribeLanguage" in recordObj["Attributes"]):
executeStateMachine(recordObj['Recording']['Location'], recordObj['ContactId'], recordObj["Attributes"]["postcallTranscribeLanguage"])
Expand Down Expand Up @@ -148,7 +154,7 @@ def lambda_handler(event, context):
except Exception as e:
raise e

def createACContactChannelAnalyticsSalesforceObject(contactId, recordingPath):
def createACContactChannelAnalyticsSalesforceObject(contactId, recordingPath = None):
pnamespace = os.environ['SF_ADAPTER_NAMESPACE']
if not pnamespace or pnamespace == '-':
logger.info("SF_ADAPTER_NAMESPACE is empty")
Expand All @@ -160,7 +166,8 @@ def createACContactChannelAnalyticsSalesforceObject(contactId, recordingPath):
sfRequest['Details']['Parameters']['sf_operation'] = 'create'
sfRequest['Details']['Parameters']['sf_object'] = pnamespace + 'AC_ContactChannelAnalytics__c'
sfRequest['Details']['Parameters'][pnamespace + 'ContactId__c'] = contactId
sfRequest['Details']['Parameters'][pnamespace + 'RecordingPath__c'] = recordingPath
if recordingPath is not None:
sfRequest['Details']['Parameters'][pnamespace + 'RecordingPath__c'] = recordingPath

ACContactChannelAnalyticsId = invokeSfAPI(sfRequest)['Id']
logger.info('SF Object Created, with ID: %s' % ACContactChannelAnalyticsId)
Expand Down
1 change: 0 additions & 1 deletion sam-app/lambda_functions/sfIntervalAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def lambda_handler(event, context):
data = s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode()
logger.info("sfIntervalAgent data: %s" % data)
sf = Salesforce()
sf.sign_in()


for record in csv.DictReader(data.split("\n")):
Expand Down
1 change: 0 additions & 1 deletion sam-app/lambda_functions/sfIntervalQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ def lambda_handler(event, context):
data = s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode()

sf = Salesforce()
sf.sign_in()

for record in csv.DictReader(data.split("\n")):
queue_record = prepare_queue_record(record, event_record['eventTime'])
Expand Down
Loading

0 comments on commit bb06c0e

Please sign in to comment.