Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved function based on a larger redcap csv (bridge2ai) #33

Merged
merged 23 commits into from
Jan 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 85 additions & 24 deletions reproschema/redcap2reproschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import yaml
from bs4 import BeautifulSoup

matrix_group_count = {}


def clean_header(header):
return {k.lstrip("\ufeff"): v for k, v in header.items()}


def normalize_condition(condition_str):
re_parentheses = re.compile(r"\(([0-9]*)\)")
Expand Down Expand Up @@ -34,33 +40,60 @@ def process_visibility(data):
return visibility_obj


def parse_field_type_and_value(data, input_type_map):
field_type = data.get("Field Type", "")

def parse_field_type_and_value(field, input_type_map):
field_type = field.get("Field Type", "")
input_type = input_type_map.get(field_type, field_type)

# Initialize the default value type as string
value_type = "xsd:string"

# Map certain field types directly to xsd types
value_type_map = {
"number": "xsd:int",
"text": "xsd:string",
"date_": "xsd:date",
"datetime_": "datetime",
"time_": "xsd:date",
"email": "email",
"phone": "phone",
}
validation_type = data.get("Text Validation Type OR Show Slider Number", "")

value_type = value_type_map.get(validation_type, "xsd:string")
"datetime_": "xsd:dateTime",
"time_": "xsd:time",
"email": "xsd:string",
"phone": "xsd:string",
} # todo: input_type="signature"

# Get the validation type from the field, if available
validation_type = field.get(
"Text Validation Type OR Show Slider Number", ""
).strip()

if validation_type:
# Map the validation type to an XSD type if it's in the map
value_type = value_type_map.get(validation_type, "xsd:string")
elif field_type in ["radio", "dropdown"]:
# If there's no validation type, but the field type is radio or dropdown, use xsd:integer
value_type = "xsd:integer"

return input_type, value_type


def process_choices(choices_str):
def process_choices(field_type, choices_str):
if field_type not in ["radio", "dropdown"]: # Handle only radio and dropdown types
return None

choices = []
for choice in choices_str.split("|"):
parts = choice.split(", ")
choice_obj = {"schema:value": int(parts[0]), "schema:name": parts[1]}
if len(parts) < 2:
print(
f"Warning: Skipping invalid choice format '{choice}' in a {field_type} field"
)
continue

# Try to convert the first part to an integer, if it fails, keep it as a string
try:
value = int(parts[0])
except ValueError:
value = parts[0]

choice_obj = {"name": parts[1], "value": value}
if len(parts) == 3:
# TODO: handle image url
# Handle image url
choice_obj["schema:image"] = f"{parts[2]}.png"
choices.append(choice_obj)
return choices
Expand Down Expand Up @@ -90,10 +123,12 @@ def parse_html(input_string, default_language="en"):
text = element.get_text(strip=True)
if text:
result[lang] = text
if not result:
if not result: # If no text was extracted
result[default_language] = soup.get_text(strip=True)
else:
result[default_language] = input_string
result[default_language] = soup.get_text(
strip=True
) # Use the entire text as default language text

return result

Expand All @@ -109,9 +144,22 @@ def process_row(
response_list,
additional_notes_list,
):
global matrix_group_count
matrix_group_name = field.get("Matrix Group Name", "")
if matrix_group_name:
matrix_group_count[matrix_group_name] = (
matrix_group_count.get(matrix_group_name, 0) + 1
)
item_id = f"{matrix_group_name}_{matrix_group_count[matrix_group_name]}"
else:
item_id = field.get("Variable / Field Name", "")

rowData = {
"@context": schema_context_url,
"@type": "reproschema:Field",
"@id": item_id,
"prefLabel": item_id,
"description": f"{item_id} of {form_name}",
}

field_type = field.get("Field Type", "")
Expand All @@ -124,8 +172,20 @@ def process_row(
if value_type:
rowData["responseOptions"] = {"valueType": value_type}

if field_type == "yesno":
rowData["responseOptions"] = {
"valueType": "xsd:boolean",
"choices": [{"name": "Yes", "value": 1}, {"name": "No", "value": 0}],
}

for key, value in field.items():
if schema_map.get(key) == "allow" and value:
if (
schema_map.get(key) in ["question", "schema:description", "preamble"]
and value
):
rowData.update({schema_map[key]: parse_html(value)})

elif schema_map.get(key) == "allow" and value:
rowData.setdefault("ui", {}).update({schema_map[key]: value.split(", ")})

elif key in ui_list and value:
Expand All @@ -139,8 +199,9 @@ def process_row(
rowData.setdefault("responseOptions", {}).update({schema_map[key]: value})

elif schema_map.get(key) == "choices" and value:
# Pass both field_type and value to process_choices
rowData.setdefault("responseOptions", {}).update(
{"choices": process_choices(value)}
{"choices": process_choices(field_type, value)}
)

elif schema_map.get(key) == "scoringLogic" and value:
Expand All @@ -159,9 +220,6 @@ def process_row(
{"variableName": field["Variable / Field Name"], "isVis": condition}
)

elif key in ["question", "schema:description", "preamble"] and value:
rowData.update({schema_map[key]: parse_html(value)})

elif key == "Identifier?" and value:
identifier_val = value.lower() == "y"
rowData.update(
Expand Down Expand Up @@ -190,6 +248,9 @@ def create_form_schema(
matrix_list,
scores_list,
):
# Use a set to track unique items and preserve order
unique_order = list(dict.fromkeys(order.get(form_name, [])))

# Construct the JSON-LD structure
json_ld = {
"@context": schema_context_url,
Expand All @@ -200,7 +261,7 @@ def create_form_schema(
"schemaVersion": "1.0.0-rc4",
"version": "0.0.1",
"ui": {
"order": order.get(form_name, []),
"order": unique_order,
"addProperties": bl_list,
"shuffle": False,
},
Expand Down Expand Up @@ -310,6 +371,7 @@ def process_csv(
with open(csv_file, mode="r", encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
row = clean_header(row)
form_name = row["Form Name"]
if form_name not in datas:
datas[form_name] = []
Expand Down Expand Up @@ -484,7 +546,6 @@ def main():
parser.add_argument("yaml_file", help="Path to the Reproschema protocol YAML file.")
args = parser.parse_args()

# Call the main conversion function
redcap2reproschema(args.csv_file, args.yaml_file)


Expand Down
Loading