Skip to content

Commit

Permalink
fix test errors after switching to pandas
Browse files Browse the repository at this point in the history
  • Loading branch information
yibeichan committed Dec 4, 2024
1 parent efb170d commit c6aa687
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 61 deletions.
128 changes: 80 additions & 48 deletions reproschema/redcap2reproschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def clean_header(header):
cleaned_header = {}
for k, v in header.items():
# Strip BOM, whitespace, and enclosing quotation marks if present
cleaned_key = k.lstrip("\ufeff").strip().strip('"')
cleaned_key = k.lstrip("\ufeff").strip().strip('"') if isinstance(k, str) else k
cleaned_header[cleaned_key] = v
return cleaned_header

Expand All @@ -100,6 +100,12 @@ def normalize_condition(condition_str, field_type=None):
return False
elif condition_str is None:
return None
elif not isinstance(condition_str, str):
# Convert non-string types to string, or return as is if conversion doesn't make sense
try:
condition_str = str(condition_str)
except:
return condition_str

re_parentheses = re.compile(r"\(([0-9]*)\)")
re_non_gt_lt_equal = re.compile(r"([^>|<])=")
Expand Down Expand Up @@ -138,9 +144,9 @@ def process_field_properties(data):
else:
condition = True

# Check Field Annotation for special flags
annotation = data.get("Field Annotation", "").upper()
if condition and (
# Check Field Annotation for special flags - safely handle non-string values
annotation = str(data.get("Field Annotation", "")).upper() if data.get("Field Annotation") is not None else ""
if condition and isinstance(annotation, str) and (
"@READONLY" in annotation
or "@HIDDEN" in annotation
or "@CALCTEXT" in annotation
Expand All @@ -152,12 +158,15 @@ def process_field_properties(data):
"isAbout": f"items/{data['Variable / Field Name']}",
"isVis": condition,
}
if data["Required Field?"]:
if data["Required Field?"] in "y":

# Handle Required Field check, accounting for NaN values and empty strings
required_field = data.get("Required Field?")
if pd.notna(required_field) and str(required_field).strip(): # Check if value is not NaN and not empty
if str(required_field).lower() == "y":
prop_obj["valueRequired"] = True
else:
raise (
f"value {data['Required Field?']} not supported yet for redcap:Required Field?"
elif str(required_field).lower() not in ["", "n"]: # Only raise error for unexpected values
raise ValueError(
f"value {required_field} not supported yet for redcap:Required Field?"
)
return prop_obj

Expand Down Expand Up @@ -256,6 +265,16 @@ def process_choices(choices_str, field_name):

def parse_html(input_string, default_language="en"):
result = {}

# Handle non-string input
if not isinstance(input_string, str):
if pd.isna(input_string): # Handle NaN values
return {default_language: ""}
try:
input_string = str(input_string)
except:
return {default_language: str(input_string)}

soup = BeautifulSoup(input_string, "html.parser")

lang_elements = soup.find_all(True, {"lang": True})
Expand All @@ -268,9 +287,7 @@ def parse_html(input_string, default_language="en"):
if not result: # If no text was extracted
result[default_language] = soup.get_text(strip=True)
else:
result[default_language] = soup.get_text(
strip=True
) # Use the entire text as default language text
result[default_language] = soup.get_text(strip=True) # Use the entire text as default language text
return result


Expand Down Expand Up @@ -508,59 +525,74 @@ def parse_language_iso_codes(input_string):
]


def process_csv_with_pandas(
def process_csv(
csv_file, abs_folder_path, schema_context_url, protocol_name
):
datas = {}
order = {}
compute = {}
languages = []

df = pd.read_csv(csv_file, encoding="utf-8")
df = df.applymap(
lambda x: x.strip() if isinstance(x, str) else x
) # Clean headers

for form_name, group in df.groupby("Form Name"):
datas[form_name] = group.to_dict(orient="records")
# Read CSV with explicit BOM handling, and maintain original order
df = pd.read_csv(csv_file, encoding="utf-8-sig") # utf-8-sig handles BOM automatically

# Clean column names (headers)
df.columns = df.columns.map(lambda x: x.strip().strip('"').lstrip("\ufeff"))

# Clean string values in the dataframe
object_columns = df.select_dtypes(include=['object']).columns
for col in object_columns:
df[col] = df[col].astype(str).replace('nan', '')

# Initialize structures for each unique form
unique_forms = df["Form Name"].unique()
for form_name in unique_forms:
datas[form_name] = []
order[form_name] = []
compute[form_name] = []
os.makedirs(
f"{abs_folder_path}/activities/{form_name}/items", exist_ok=True
)

# TODO: should we bring back the language
# if not languages:
# languages = parse_language_iso_codes(row["Field Label"])

for _, row in group.iterrows():
field_name = row["Variable / Field Name"]
if row.get("Field Type", "") in COMPUTE_LIST:
# TODO: this right now doesn't give jsExpression
condition = normalize_condition(
row["Choices, Calculations, OR Slider Labels"],
field_type=row["Field Type"],
)
# TODO: should we bring back the language
# if not languages:
# languages = parse_language_iso_codes(row["Field Label"])

# Process rows in original order
for _, row in df.iterrows():
form_name = row["Form Name"]
field_name = row["Variable / Field Name"]
field_type = row.get("Field Type", "")
field_annotation = row.get("Field Annotation")

# Add row data to datas dictionary
datas[form_name].append(row.to_dict())

if field_type in COMPUTE_LIST:
condition = normalize_condition(
row["Choices, Calculations, OR Slider Labels"],
field_type=field_type,
)
compute[form_name].append(
{
"variableName": field_name,
"jsExpression": condition,
}
)
elif isinstance(field_annotation, str) and "@CALCTEXT" in field_annotation.upper():
calc_text = field_annotation
match = re.search(r"@CALCTEXT\((.*)\)", calc_text)
if match:
js_expression = match.group(1)
js_expression = normalize_condition(js_expression)
compute[form_name].append(
{
"variableName": field_name,
"jsExpression": condition,
"jsExpression": js_expression,
}
)
elif "@CALCTEXT" in row.get("Field Annotation", "").upper():
calc_text = row["Field Annotation"]
match = re.search(r"@CALCTEXT\((.*)\)", calc_text)
if match:
js_expression = match.group(1)
js_expression = normalize_condition(js_expression)
compute[form_name].append(
{
"variableName": field_name,
"jsExpression": js_expression,
}
)
else:
order[form_name].append(f"items/{field_name}")
else:
order[form_name].append(f"items/{field_name}")

os.makedirs(f"{abs_folder_path}/{protocol_name}", exist_ok=True)
return datas, order, compute, languages
Expand Down Expand Up @@ -602,7 +634,7 @@ def redcap2reproschema(
schema_context_url = CONTEXTFILE_URL

# Process the CSV file
datas, order, compute, _ = process_csv_with_pandas(
datas, order, compute, _ = process_csv(
csv_file,
abs_folder_path,
schema_context_url,
Expand Down
27 changes: 14 additions & 13 deletions reproschema/tests/test_redcap2reproschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ def test_redcap2reproschema(tmpdir):
temp_csv_file = tmpdir.join(CSV_FILE_NAME)
temp_yaml_file = tmpdir.join(YAML_FILE_NAME)

shutil.copy(CSV_TEST_FILE, str(temp_csv_file)) # Convert to string
shutil.copy(YAML_TEST_FILE, str(temp_yaml_file)) # Convert to string
print("tmpdir: ", tmpdir)
# Change the current working directory to tmpdir
shutil.copy(CSV_TEST_FILE, str(temp_csv_file))
shutil.copy(YAML_TEST_FILE, str(temp_yaml_file))

# Add debug output to see the content of the CSV file
with open(str(temp_csv_file), 'r') as f:
print("CSV content:", f.read())

with tmpdir.as_cwd():
# Read YAML to find the expected output directory name
with open(str(temp_yaml_file), "r") as file: # Convert to string
with open(str(temp_yaml_file), "r") as file:
protocol = yaml.safe_load(file)
protocol_name = protocol.get("protocol_name", "").replace(" ", "_")

Expand All @@ -39,12 +42,10 @@ def test_redcap2reproschema(tmpdir):
"redcap2reproschema",
str(temp_csv_file),
str(temp_yaml_file),
], # Convert to string
],
)

assert (
result.exit_code == 0
), f"The command failed to execute successfully: {result.output}"
assert os.path.isdir(
protocol_name
), f"Expected output directory '{protocol_name}' does not exist"

print("Command output:", result.output) # Add debug output

assert result.exit_code == 0, f"Command failed with: {result.output}"
assert os.path.isdir(protocol_name), f"Expected output directory '{protocol_name}' does not exist"

0 comments on commit c6aa687

Please sign in to comment.