Skip to content
This repository has been archived by the owner on Nov 1, 2024. It is now read-only.

Commit

Permalink
Merge pull request #32 from SafetyCulture/INTG-501
Browse files Browse the repository at this point in the history
INTG-501 Update CSV format to exactly match the Tableau format
  • Loading branch information
Redbeard0220 authored Aug 29, 2017
2 parents b356302 + 24b361d commit 8ad0af1
Show file tree
Hide file tree
Showing 40 changed files with 1,176 additions and 632 deletions.
5 changes: 3 additions & 2 deletions tools/exporter/ReadMe.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ python exporter.py --format json
python csvExporter.py path/to/audit_file.json
```
* Basic example of [CSV Export Format](https://github.com/SafetyCulture/safetyculture-sdk-python/blob/master/tools/exporter/tests/csv_test_files/unit_test_single_question_yes___no___na_answered_no_expected_output.csv)
* [Explanation and details of CSV format](https://support.safetyculture.com/integrations/safetyculture-csv-exporter-tool/#format)

#### Bulk CSV Export
* Each Audit is the same format as the single Audit CSV export
Expand All @@ -70,10 +71,10 @@ To export Multiple Audits to Bulk CSV file:
python exporter.py --format csv
```

#### CSV values whose format does not match JSON properties
#### The format of the following CSV values do not match the format used by the SafetyCulture API Audit JSON
##### Date/Time field
* JSON: `2017-03-03T03:45:58.090Z`
* CSV: Date Value: `03 March 2017` and Time Value: `03:45AM7`
* CSV: `03 March 2017 03:45 AM`
##### Checkbox field
* JSON: `1` or `0`
* CSV: `True` or `False`
Expand Down
179 changes: 124 additions & 55 deletions tools/exporter/csvExporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,43 @@
from datetime import datetime

CSV_HEADER_ROW = [
'Item Type',
'ItemType',
'Label',
'Response',
'Comment',
'Media Hypertext Reference',
'Location Coordinates',
'Item Score',
'Item Max Score',
'Item Score Percentage',
'MediaHypertextReference',
'Latitude',
'Longitude',
'ItemScore',
'ItemMaxScore',
'ItemScorePercentage',
'Mandatory',
'Failed Response',
'FailedResponse',
'Inactive',
'Item ID',
'Response ID',
'Parent ID',
'Audit Owner',
'Audit Author',
'Audit Name',
'Audit Score',
'Audit Max Score',
'Audit Score Percentage',
'Audit Duration (seconds)',
'Date Started',
'Time Started',
'Date Completed',
'Time Completed',
'Audit ID',
'Template ID',
'Template Name',
'Template Author'
'ItemID',
'ResponseID',
'ParentID',
'AuditOwner',
'AuditAuthor',
'AuditName',
'AuditScore',
'AuditMaxScore',
'AuditScorePercentage',
'AuditDuration',
'DateStarted',
'DateCompleted',
'DateModified',
'AuditID',
'TemplateID',
'TemplateName',
'TemplateAuthor',
'ItemCategory',
'DocumentNo',
'ConductedOn',
'PreparedBy',
'Location',
'Personnel',
'ClientSite'
]

# audit item empty response
Expand Down Expand Up @@ -119,6 +126,16 @@
'b5c92352-e11b-11e1-9b23-0800200c9a66': 'N/A'
}

# maps header fields to their static IDs
header_field_id = {
'DocumentNo': 'f3245d46-ea77-11e1-aff1-0800200c9a66',
'ConductedOn': 'f3245d42-ea77-11e1-aff1-0800200c9a66',
'PreparedBy': 'f3245d43-ea77-11e1-aff1-0800200c9a66',
'Location': 'f3245d44-ea77-11e1-aff1-0800200c9a66',
'Personnel': 'f3245d45-ea77-11e1-aff1-0800200c9a66',
'ClientSite': 'f3245d41-ea77-11e1-aff1-0800200c9a66'
}


def get_json_property(obj, *args):
"""
Expand Down Expand Up @@ -156,8 +173,10 @@ def __init__(self, audit_json, export_inactive_items=True):
"""
self.audit_json = audit_json
self.export_inactive_items = export_inactive_items
self.map_items()
self.audit_table = self.convert_audit_to_table()


def audit_id(self):
"""
:return: The audit ID
Expand All @@ -170,6 +189,35 @@ def audit_items(self):
"""
return self.audit_json['header_items'] + self.audit_json['items']

def map_items(self):
"""
Creates a dictionary which maps each item to it's parent ID, Label, and Type.
This tree can then be traversed recursively to find the Category or Section of a given item.
"""
self.item_category = EMPTY_RESPONSE
self.item_map = {}
for item in self.audit_items():
if item.get('item_id'):
self.item_map[item['item_id']] = {
'parent_id': item.get('parent_id') or EMPTY_RESPONSE,
'label': item.get('label') or EMPTY_RESPONSE,
'type': item.get('type') or EMPTY_RESPONSE
}

def get_item_category(self, item_id):
"""
Recursively traverses the item Map, following parent IDs until it gets to a Section or Category.
When a Section or Category is found, the item Category is set to the label of that Section or Category.
:param item_id: item ID to find Category for
:return: Category or Section label
"""
if not item_id:
return EMPTY_RESPONSE
elif self.item_map[item_id]['type'] == 'section' or self.item_map[item_id]['type'] == 'category':
return self.item_map[item_id]['label'] or EMPTY_RESPONSE
else:
return self.get_item_category(self.item_map[item_id]['parent_id'])

def audit_custom_response_id_to_label_map(self):
"""
:return: dictionary mapping custom response_id's to their label
Expand All @@ -188,6 +236,7 @@ def common_audit_data(self):
audit_data_property = self.audit_json['audit_data']
template_data_property = self.audit_json['template_data']
audit_date_completed = audit_data_property['date_completed']
header_data = self.audit_json['header_items']
audit_data_as_list = list()
audit_data_as_list.append(audit_data_property['authorship']['owner'])
audit_data_as_list.append(audit_data_property['authorship']['author'])
Expand All @@ -196,41 +245,54 @@ def common_audit_data(self):
audit_data_as_list.append(audit_data_property['total_score'])
audit_data_as_list.append(audit_data_property[SCORE_PERCENTAGE])
audit_data_as_list.append(audit_data_property['duration'])
audit_data_as_list.append(self.format_date(audit_data_property['date_started']))
audit_data_as_list.append(self.format_time(audit_data_property['date_started']))
audit_data_as_list.append(self.format_date(audit_date_completed))
audit_data_as_list.append(self.format_time(audit_date_completed))
audit_data_as_list.append(self.format_date_time(audit_data_property['date_started']))
audit_data_as_list.append(self.format_date_time(audit_date_completed))
audit_data_as_list.append(self.format_date_time(audit_data_property['date_modified']))
audit_data_as_list.append(self.audit_id())
audit_data_as_list.append(self.audit_json['template_id'])
audit_data_as_list.append(template_data_property['metadata']['name'])
audit_data_as_list.append(template_data_property['authorship']['author'])
audit_data_as_list.append(self.item_category)
audit_data_as_list.append(self.get_header_item(header_data, 'DocumentNo'))
audit_data_as_list.append(self.get_header_item(header_data, 'ConductedOn'))
audit_data_as_list.append(self.get_header_item(header_data, 'PreparedBy'))
audit_data_as_list.append(self.get_header_item(header_data, 'Location'))
audit_data_as_list.append(self.get_header_item(header_data, 'Personnel'))
audit_data_as_list.append(self.get_header_item(header_data, 'ClientSite'))
return audit_data_as_list

@staticmethod
def format_date(date):
"""
:param date: date in the format: 2017-03-03T03:45:58.090Z
:return: date in the format: '03 March 2017',
"""
if date:
date_object = datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%fZ')
formatted_date = date_object.strftime('%d %B %Y')
return formatted_date
else:
return ''
def get_header_item(self, header_data, header_item_type):
"""
:param header_data:
:param header_item_type:
:return:
"""
for item in header_data:
if item.get('item_id') == header_field_id.get(header_item_type):
if 'responses' not in item.keys():
return EMPTY_RESPONSE
if 'text' in item['responses'].keys():
return get_json_property(item, 'responses', 'text')
if 'datetime' in item['responses'].keys():
return get_json_property(item, 'responses', 'datetime')
if 'location_text' in item['responses'].keys():
return get_json_property(item, 'responses', 'location_text')
return EMPTY_RESPONSE

@staticmethod
def format_time(date):
def format_date_time(date):
"""
:param date: date in the format: 2017-03-03T03:45:58.090Z
:return: time in the format '03:45AM'
:return: date and time in the format: '03 March 2017 03:45 AM',
"""
if date:
date_object = datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.%fZ')
formatted_time = date_object.strftime('%I:%M%p')
return formatted_time
formatted_date = date_object.strftime('%d %B %Y')
formatted_time = date_object.strftime('%I:%M %p')
return formatted_date + ' ' + formatted_time
else:
return ''
return EMPTY_RESPONSE

def convert_audit_to_table(self):
"""
Expand All @@ -240,6 +302,10 @@ def convert_audit_to_table(self):
"""
self.audit_table = []
for item in self.audit_items():
if item.get('parent_id'):
self.item_category = self.get_item_category(item['parent_id'])
else:
self.item_category = EMPTY_RESPONSE
row_array = self.item_properties_as_list(item) + self.common_audit_data()
if get_json_property(item, INACTIVE) and not self.export_inactive_items:
continue
Expand Down Expand Up @@ -323,8 +389,7 @@ def get_item_response(self, item):
elif item_type == 'smartfield':
response = get_json_property(item, 'evaluation')
elif item_type == 'datetime':
response = self.format_date(get_json_property(item, RESPONSES, 'datetime'))
response = response + ' at ' + self.format_time(get_json_property(item, RESPONSES, 'datetime'))
response = self.format_date_time(get_json_property(item, RESPONSES, 'datetime'))
elif item_type == 'text' or item_type == 'textsingle':
response = get_json_property(item, RESPONSES, 'text')
elif item_type == INFORMATION and get_json_property(item, 'options', TYPE) == 'link':
Expand Down Expand Up @@ -454,29 +519,33 @@ def get_item_location_coordinates(self, item):
item_type = get_json_property(item, TYPE)
if item_type == 'address':
location_coordinates = get_json_property(item, 'responses', 'location', 'geometry', 'coordinates')
if isinstance(location_coordinates, list):
return str(location_coordinates).strip('[]')
return EMPTY_RESPONSE
if isinstance(location_coordinates, list) and len(location_coordinates):
return str(location_coordinates).strip('[]').split(',')
return [EMPTY_RESPONSE, EMPTY_RESPONSE]

def item_properties_as_list(self, item):
"""
Returns selected properties of the audit item JSON as a list
:param item: single item in JSON format
:return: array of item data, in format that CSV writer can handle
"""
location_coordinates = self.get_item_location_coordinates(item)
latitude = location_coordinates[1]
longitude = location_coordinates[0]
return [
self.get_item_type(item),
self.get_item_label(item),
self.get_item_response(item),
get_json_property(item, RESPONSES, 'text') if item.get(TYPE) not in ['text', 'textsingle'] else EMPTY_RESPONSE,
self.get_item_media(item),
self.get_item_location_coordinates(item),
latitude,
longitude,
self.get_item_score(item),
self.get_item_max_score(item),
self.get_item_score_percentage(item),
get_json_property(item, 'options', 'is_mandatory'),
get_json_property(item, RESPONSES, FAILED),
get_json_property(item, INACTIVE),
get_json_property(item, 'options', 'is_mandatory') or False,
get_json_property(item, RESPONSES, FAILED) or False,
get_json_property(item, INACTIVE) or False,
get_json_property(item, ID),
self.get_item_response_id(item),
get_json_property(item, PARENT_ID)
Expand Down
2 changes: 1 addition & 1 deletion tools/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def get_media_from_audit(logger, audit_json):
# This condition checks for media attached to signature and drawing type fields.
if 'responses' in item.keys() and 'image' in item['responses'].keys():
media_id_list.append(item['responses']['image']['media_id'])
# This condition checks for media attached to information type fields.
# This condition checks for media attached to information type fields.
if 'options' in item.keys() and 'media' in item['options'].keys():
media_id_list.append(item['options']['media']['media_id'])
logger.info("Discovered {0} media files associated with {1}.".format(len(media_id_list), audit_json['audit_id']))
Expand Down
Loading

0 comments on commit 8ad0af1

Please sign in to comment.