From c6aa6877c594809f61383707025d36da24ffd2d1 Mon Sep 17 00:00:00 2001 From: Yibei Chen Date: Tue, 3 Dec 2024 20:00:50 -0500 Subject: [PATCH] fix test errors after switching to pandas --- reproschema/redcap2reproschema.py | 128 ++++++++++++------- reproschema/tests/test_redcap2reproschema.py | 27 ++-- 2 files changed, 94 insertions(+), 61 deletions(-) diff --git a/reproschema/redcap2reproschema.py b/reproschema/redcap2reproschema.py index 4a87b01..3ba90d5 100644 --- a/reproschema/redcap2reproschema.py +++ b/reproschema/redcap2reproschema.py @@ -83,7 +83,7 @@ def clean_header(header): cleaned_header = {} for k, v in header.items(): # Strip BOM, whitespace, and enclosing quotation marks if present - cleaned_key = k.lstrip("\ufeff").strip().strip('"') + cleaned_key = k.lstrip("\ufeff").strip().strip('"') if isinstance(k, str) else k cleaned_header[cleaned_key] = v return cleaned_header @@ -100,6 +100,12 @@ def normalize_condition(condition_str, field_type=None): return False elif condition_str is None: return None + elif not isinstance(condition_str, str): + # Convert non-string types to string, or return as is if conversion doesn't make sense + try: + condition_str = str(condition_str) + except: + return condition_str re_parentheses = re.compile(r"\(([0-9]*)\)") re_non_gt_lt_equal = re.compile(r"([^>|<])=") @@ -138,9 +144,9 @@ def process_field_properties(data): else: condition = True - # Check Field Annotation for special flags - annotation = data.get("Field Annotation", "").upper() - if condition and ( + # Check Field Annotation for special flags - safely handle non-string values + annotation = str(data.get("Field Annotation", "")).upper() if data.get("Field Annotation") is not None else "" + if condition and isinstance(annotation, str) and ( "@READONLY" in annotation or "@HIDDEN" in annotation or "@CALCTEXT" in annotation @@ -152,12 +158,15 @@ def process_field_properties(data): "isAbout": f"items/{data['Variable / Field Name']}", "isVis": condition, } - if data["Required Field?"]: - if data["Required Field?"] in "y": + + # Handle Required Field check, accounting for NaN values and empty strings + required_field = data.get("Required Field?") + if pd.notna(required_field) and str(required_field).strip(): # Check if value is not NaN and not empty + if str(required_field).lower() == "y": prop_obj["valueRequired"] = True - else: - raise ( - f"value {data['Required Field?']} not supported yet for redcap:Required Field?" + elif str(required_field).lower() not in ["", "n"]: # Only raise error for unexpected values + raise ValueError( + f"value {required_field} not supported yet for redcap:Required Field?" ) return prop_obj @@ -256,6 +265,16 @@ def process_choices(choices_str, field_name): def parse_html(input_string, default_language="en"): result = {} + + # Handle non-string input + if not isinstance(input_string, str): + if pd.isna(input_string): # Handle NaN values + return {default_language: ""} + try: + input_string = str(input_string) + except: + return {default_language: str(input_string)} + soup = BeautifulSoup(input_string, "html.parser") lang_elements = soup.find_all(True, {"lang": True}) @@ -268,9 +287,7 @@ def parse_html(input_string, default_language="en"): if not result: # If no text was extracted result[default_language] = soup.get_text(strip=True) else: - result[default_language] = soup.get_text( - strip=True - ) # Use the entire text as default language text + result[default_language] = soup.get_text(strip=True) # Use the entire text as default language text return result @@ -508,7 +525,7 @@ def parse_language_iso_codes(input_string): ] -def process_csv_with_pandas( +def process_csv( csv_file, abs_folder_path, schema_context_url, protocol_name ): datas = {} @@ -516,51 +533,66 @@ def process_csv_with_pandas( compute = {} languages = [] - df = pd.read_csv(csv_file, encoding="utf-8") - df = df.applymap( - lambda x: x.strip() if isinstance(x, str) else x - ) # Clean headers - - for form_name, group in df.groupby("Form Name"): - datas[form_name] = group.to_dict(orient="records") + # Read CSV with explicit BOM handling, and maintain original order + df = pd.read_csv(csv_file, encoding="utf-8-sig") # utf-8-sig handles BOM automatically + + # Clean column names (headers) + df.columns = df.columns.map(lambda x: x.strip().strip('"').lstrip("\ufeff")) + + # Clean string values in the dataframe + object_columns = df.select_dtypes(include=['object']).columns + for col in object_columns: + df[col] = df[col].astype(str).replace('nan', '') + + # Initialize structures for each unique form + unique_forms = df["Form Name"].unique() + for form_name in unique_forms: + datas[form_name] = [] order[form_name] = [] compute[form_name] = [] os.makedirs( f"{abs_folder_path}/activities/{form_name}/items", exist_ok=True ) - # TODO: should we bring back the language - # if not languages: - # languages = parse_language_iso_codes(row["Field Label"]) - - for _, row in group.iterrows(): - field_name = row["Variable / Field Name"] - if row.get("Field Type", "") in COMPUTE_LIST: - # TODO: this right now doesn't give jsExpression - condition = normalize_condition( - row["Choices, Calculations, OR Slider Labels"], - field_type=row["Field Type"], - ) + # TODO: should we bring back the language + # if not languages: + # languages = parse_language_iso_codes(row["Field Label"]) + + # Process rows in original order + for _, row in df.iterrows(): + form_name = row["Form Name"] + field_name = row["Variable / Field Name"] + field_type = row.get("Field Type", "") + field_annotation = row.get("Field Annotation") + + # Add row data to datas dictionary + datas[form_name].append(row.to_dict()) + + if field_type in COMPUTE_LIST: + condition = normalize_condition( + row["Choices, Calculations, OR Slider Labels"], + field_type=field_type, + ) + compute[form_name].append( + { + "variableName": field_name, + "jsExpression": condition, + } + ) + elif isinstance(field_annotation, str) and "@CALCTEXT" in field_annotation.upper(): + calc_text = field_annotation + match = re.search(r"@CALCTEXT\((.*)\)", calc_text) + if match: + js_expression = match.group(1) + js_expression = normalize_condition(js_expression) compute[form_name].append( { "variableName": field_name, - "jsExpression": condition, + "jsExpression": js_expression, } ) - elif "@CALCTEXT" in row.get("Field Annotation", "").upper(): - calc_text = row["Field Annotation"] - match = re.search(r"@CALCTEXT\((.*)\)", calc_text) - if match: - js_expression = match.group(1) - js_expression = normalize_condition(js_expression) - compute[form_name].append( - { - "variableName": field_name, - "jsExpression": js_expression, - } - ) - else: - order[form_name].append(f"items/{field_name}") + else: + order[form_name].append(f"items/{field_name}") os.makedirs(f"{abs_folder_path}/{protocol_name}", exist_ok=True) return datas, order, compute, languages @@ -602,7 +634,7 @@ def redcap2reproschema( schema_context_url = CONTEXTFILE_URL # Process the CSV file - datas, order, compute, _ = process_csv_with_pandas( + datas, order, compute, _ = process_csv( csv_file, abs_folder_path, schema_context_url, diff --git a/reproschema/tests/test_redcap2reproschema.py b/reproschema/tests/test_redcap2reproschema.py index 5096834..634c7f0 100644 --- a/reproschema/tests/test_redcap2reproschema.py +++ b/reproschema/tests/test_redcap2reproschema.py @@ -23,13 +23,16 @@ def test_redcap2reproschema(tmpdir): temp_csv_file = tmpdir.join(CSV_FILE_NAME) temp_yaml_file = tmpdir.join(YAML_FILE_NAME) - shutil.copy(CSV_TEST_FILE, str(temp_csv_file)) # Convert to string - shutil.copy(YAML_TEST_FILE, str(temp_yaml_file)) # Convert to string - print("tmpdir: ", tmpdir) - # Change the current working directory to tmpdir + shutil.copy(CSV_TEST_FILE, str(temp_csv_file)) + shutil.copy(YAML_TEST_FILE, str(temp_yaml_file)) + + # Add debug output to see the content of the CSV file + with open(str(temp_csv_file), 'r') as f: + print("CSV content:", f.read()) + with tmpdir.as_cwd(): # Read YAML to find the expected output directory name - with open(str(temp_yaml_file), "r") as file: # Convert to string + with open(str(temp_yaml_file), "r") as file: protocol = yaml.safe_load(file) protocol_name = protocol.get("protocol_name", "").replace(" ", "_") @@ -39,12 +42,10 @@ def test_redcap2reproschema(tmpdir): "redcap2reproschema", str(temp_csv_file), str(temp_yaml_file), - ], # Convert to string + ], ) - - assert ( - result.exit_code == 0 - ), f"The command failed to execute successfully: {result.output}" - assert os.path.isdir( - protocol_name - ), f"Expected output directory '{protocol_name}' does not exist" + + print("Command output:", result.output) # Add debug output + + assert result.exit_code == 0, f"Command failed with: {result.output}" + assert os.path.isdir(protocol_name), f"Expected output directory '{protocol_name}' does not exist"