diff --git a/.github/workflows/R_CMD_check_Hades.yaml b/.github/workflows/R_CMD_check_Hades.yaml new file mode 100644 index 0000000..c40304f --- /dev/null +++ b/.github/workflows/R_CMD_check_Hades.yaml @@ -0,0 +1,171 @@ +# For help debugging build failures open an issue on the RStudio community with the 'github-actions' tag. +# https://community.rstudio.com/new-topic?category=Package%20development&tags=github-actions +on: + push: + branches: + - '**' + pull_request: + branches: + - '**' + +name: R-CMD-check + +jobs: + R-CMD-check: + runs-on: ${{ matrix.config.os }} + + name: ${{ matrix.config.os }} (${{ matrix.config.r }}) + + strategy: + fail-fast: false + matrix: + config: + - {os: windows-latest, r: 'release'} + - {os: macOS-latest, r: 'release'} + - {os: ubuntu-20.04, r: 'release', rspm: "https://packagemanager.rstudio.com/cran/__linux__/focal/latest"} + + env: + GITHUB_PAT: ${{ secrets.GH_TOKEN }} + R_REMOTES_NO_ERRORS_FROM_WARNINGS: true + RSPM: ${{ matrix.config.rspm }} + CDM5_ORACLE_CDM_SCHEMA: ${{ secrets.CDM5_ORACLE_CDM_SCHEMA }} + CDM5_ORACLE_OHDSI_SCHEMA: ${{ secrets.CDM5_ORACLE_OHDSI_SCHEMA }} + CDM5_ORACLE_PASSWORD: ${{ secrets.CDM5_ORACLE_PASSWORD }} + CDM5_ORACLE_SERVER: ${{ secrets.CDM5_ORACLE_SERVER }} + CDM5_ORACLE_USER: ${{ secrets.CDM5_ORACLE_USER }} + CDM5_POSTGRESQL_CDM_SCHEMA: ${{ secrets.CDM5_POSTGRESQL_CDM_SCHEMA }} + CDM5_POSTGRESQL_OHDSI_SCHEMA: ${{ secrets.CDM5_POSTGRESQL_OHDSI_SCHEMA }} + CDM5_POSTGRESQL_PASSWORD: ${{ secrets.CDM5_POSTGRESQL_PASSWORD }} + CDM5_POSTGRESQL_SERVER: ${{ secrets.CDM5_POSTGRESQL_SERVER }} + CDM5_POSTGRESQL_USER: ${{ secrets.CDM5_POSTGRESQL_USER }} + CDM5_SQL_SERVER_CDM_SCHEMA: ${{ secrets.CDM5_SQL_SERVER_CDM_SCHEMA }} + CDM5_SQL_SERVER_OHDSI_SCHEMA: ${{ secrets.CDM5_SQL_SERVER_OHDSI_SCHEMA }} + CDM5_SQL_SERVER_PASSWORD: ${{ secrets.CDM5_SQL_SERVER_PASSWORD }} + CDM5_SQL_SERVER_SERVER: ${{ secrets.CDM5_SQL_SERVER_SERVER }} + CDM5_SQL_SERVER_USER: ${{ secrets.CDM5_SQL_SERVER_USER }} + CDM5_REDSHIFT_CDM_SCHEMA: ${{ secrets.CDM5_REDSHIFT_CDM_SCHEMA }} + CDM5_REDSHIFT_OHDSI_SCHEMA: ${{ secrets.CDM5_REDSHIFT_OHDSI_SCHEMA }} + CDM5_REDSHIFT_PASSWORD: ${{ secrets.CDM5_REDSHIFT_PASSWORD }} + CDM5_REDSHIFT_SERVER: ${{ secrets.CDM5_REDSHIFT_SERVER }} + CDM5_REDSHIFT_USER: ${{ secrets.CDM5_REDSHIFT_USER }} + CDM5_SPARK_USER: ${{ secrets.CDM5_SPARK_USER }} + CDM5_SPARK_PASSWORD: ${{ secrets.CDM5_SPARK_PASSWORD }} + CDM5_SPARK_CONNECTION_STRING: ${{ secrets.CDM5_SPARK_CONNECTION_STRING }} + + steps: + - uses: actions/checkout@v2 + + - uses: r-lib/actions/setup-r@v2 + with: + r-version: ${{ matrix.config.r }} + + - uses: r-lib/actions/setup-tinytex@v2 + + - uses: r-lib/actions/setup-pandoc@v2 + + - name: Install system requirements + if: runner.os == 'Linux' + run: | + sudo apt-get install -y libssh-dev + Rscript -e 'install.packages("remotes")' + while read -r cmd + do + eval sudo $cmd + done < <(Rscript -e 'writeLines(remotes::system_requirements("ubuntu", "20.04"))') + + - uses: r-lib/actions/setup-r-dependencies@v2 + with: + extra-packages: any::rcmdcheck + needs: check + + - uses: r-lib/actions/check-r-package@v2 + with: + args: 'c("--no-manual", "--as-cran")' + error-on: '"warning"' + check-dir: '"check"' + + - name: Upload source package + if: success() && runner.os == 'macOS' && github.event_name != 'pull_request' && github.ref == 'refs/heads/main' + uses: actions/upload-artifact@v2 + with: + name: package_tarball + path: check/*.tar.gz + + - name: Install covr + if: runner.os == 'macOS' + run: | + install.packages("covr") + shell: Rscript {0} + + - name: Test coverage + if: runner.os == 'macOS' + run: covr::codecov() + shell: Rscript {0} + + Release: + needs: R-CMD-Check + + runs-on: macOS-latest + + env: + GH_TOKEN: ${{ secrets.GH_TOKEN }} + + if: ${{ github.event_name != 'pull_request' && github.ref == 'refs/heads/main' }} + + steps: + + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Check if version has increased + run: | + echo "new_version="$(perl compare_versions --tag) >> $GITHUB_ENV + + - name: Display new version number + if: ${{ env.new_version != '' }} + run: | + echo "${{ env.new_version }}" + + - name: Create release + if: ${{ env.new_version != '' }} + uses: actions/create-release@v1 + env: + GITHUB_TOKEN: ${{ secrets.GH_TOKEN }} + with: + tag_name: ${{ env.new_version }} + release_name: Release ${{ env.new_version }} + body: | + See NEWS.md for release notes. + draft: false + prerelease: false + + - uses: r-lib/actions/setup-r@v2 + if: ${{ env.new_version != '' }} + + - name: Install drat + if: ${{ env.new_version != '' }} + run: | + install.packages('drat') + shell: Rscript {0} + + - name: Remove any tarballs that already exists + if: ${{ env.new_version != '' }} + run: | + rm -f *.tar.gz + + - name: Download package tarball + if: ${{ env.new_version != '' }} + uses: actions/download-artifact@v2 + with: + name: package_tarball + + - name: Push to drat + if: ${{ env.new_version != '' }} + run: | + bash deploy.sh + + - name: Push to BroadSea + if: ${{ env.new_version != '' }} + run: | + curl --data "build=true" -X POST https://registry.hub.docker.com/u/ohdsi/broadsea-methodslibrary/trigger/f0b51cec-4027-4781-9383-4b38b42dd4f5/ diff --git a/.github/workflows/nightly_cleanup_Hades.yml b/.github/workflows/nightly_cleanup_Hades.yml new file mode 100644 index 0000000..80af7c7 --- /dev/null +++ b/.github/workflows/nightly_cleanup_Hades.yml @@ -0,0 +1,18 @@ +name: 'nightly artifacts cleanup' +on: + schedule: + - cron: '0 1 * * *' # every night at 1 am UTC + +jobs: + remove-old-artifacts: + runs-on: ubuntu-latest + timeout-minutes: 10 + + steps: + - name: Remove old artifacts + uses: c-hive/gha-remove-artifacts@v1 + with: + age: '7 days' + # Optional inputs + # skip-tags: true + skip-recent: 1 diff --git a/.gitignore b/.gitignore index dcdc045..49bf4c3 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ .Ruserdata *.tmp .Rprofile +errorReportSql.txt diff --git a/DESCRIPTION b/DESCRIPTION index 74331cd..47aa76a 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,9 +1,13 @@ Package: CohortAlgebra Type: Package -Title: Cohort Algebra to create new cohort(s) from existing cohorts. +Title: Cohort Algebra to create new cohort(s) from existing cohorts Version: 0.0.1 Date: 2022-07-07 -Author: Gowtham Rao [aut, cre] +Authors@R: c( + person("Gowtham", "Rao", email = "rao@ohdsi.org", role = c("aut", "cre")), + person("Adam", "Black", email = "black@ohdsi.org", role = c("aut")), + person("Observational Health Data Science and Informatics", role = c("cph")) + ) Maintainer: Gowtham Rao Description: An R package that creates new cohort(s) from previously instantiated cohorts. Depends: @@ -11,12 +15,20 @@ Depends: R (>= 4.1.0) Imports: checkmate, - rlang, - testthat + dplyr, + ParallelLogger, + rlang Suggests: - testthat + remotes, + rmarkdown, + knitr, + testthat, + withr +Remotes: + ohdsi/ParallelLogger License: Apache License RoxygenNote: 7.2.0 +VignetteBuilder: knitr Roxygen: list(markdown = TRUE) Encoding: UTF-8 Language: en-US diff --git a/NAMESPACE b/NAMESPACE index 846c9fa..ee804b9 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -1,4 +1,8 @@ # Generated by roxygen2: do not edit by hand +export(eraFyCohorts) +export(intersectCohorts) +export(minusCohorts) +import(DatabaseConnector) import(dplyr) importFrom(rlang,.data) diff --git a/R/CohortAlgebra.R b/R/CohortAlgebra.R index 5870862..61d8b9d 100644 --- a/R/CohortAlgebra.R +++ b/R/CohortAlgebra.R @@ -17,6 +17,7 @@ #' @keywords internal "_PACKAGE" +#' @import DatabaseConnector #' @import dplyr #' @importFrom rlang .data NULL diff --git a/R/CopyCohortsToTempTable.R b/R/CopyCohortsToTempTable.R new file mode 100644 index 0000000..9a14045 --- /dev/null +++ b/R/CopyCohortsToTempTable.R @@ -0,0 +1,79 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Get cohort ids in table +#' +#' @description +#' Get cohort ids in table. This function is not exported. +#' +#' @template Connection +#' +#' @template OldToNewCohortId +#' +#' @template TempEmulationSchema +#' +#' @param sourceCohortDatabaseSchema The database schema of the source cohort table. +#' +#' @param sourceCohortTable The name of the source cohort table. +#' +#' @param targetCohortTable A temp table to copy the cohorts from the source table. +#' +#' @return +#' NULL +#' +copyCohortsToTempTable <- function(connection = NULL, + oldToNewCohortId, + sourceCohortDatabaseSchema = NULL, + sourceCohortTable, + targetCohortTable = "#cohort_rows", + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { + DatabaseConnector::insertTable( + connection = connection, + tableName = "#old_to_new_cohort_id", + createTable = TRUE, + dropTableIfExists = TRUE, + tempTable = TRUE, + tempEmulationSchema = tempEmulationSchema, + progressBar = FALSE, + bulkLoad = (Sys.getenv("bulkLoad") == TRUE), + camelCaseToSnakeCase = TRUE, + data = oldToNewCohortId + ) + + sqlCopyCohort <- " + DROP TABLE IF EXISTS @target_cohort_table; + SELECT target.new_cohort_id cohort_definition_id, + source.subject_id, + source.cohort_start_date, + source.cohort_end_date + INTO @target_cohort_table + FROM {@source_database_schema != ''} ? {@source_database_schema.@source_cohort_table} : {@source_cohort_table} source + INNER JOIN #old_to_new_cohort_id target + ON source.cohort_definition_id = target.old_cohort_id + ;" + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = sqlCopyCohort, + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + source_database_schema = sourceCohortDatabaseSchema, + source_cohort_table = sourceCohortTable, + target_cohort_table = targetCohortTable, + tempEmulationSchema = tempEmulationSchema + ) +} diff --git a/R/DeleteCohortRecords.R b/R/DeleteCohortRecords.R new file mode 100644 index 0000000..f32ef63 --- /dev/null +++ b/R/DeleteCohortRecords.R @@ -0,0 +1,83 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Delete cohort records. +#' +#' @description +#' Delete all records from cohort table with the given cohort id. Edit privileges +#' to the cohort table is required. +#' +#' @template ConnectionDetails +#' +#' @template Connection +#' +#' @template CohortTable +#' +#' @template CohortIds +#' +#' @template TempEmulationSchema +#' +#' @return +#' NULL +#' +deleteCohortRecords <- function(connectionDetails = NULL, + connection = NULL, + cohortDatabaseSchema, + cohortTable = "cohort", + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + cohortIds) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertIntegerish( + x = cohortIds, + min.len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortDatabaseSchema, + min.chars = 1, + len = 1, + null.ok = TRUE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortTable, + min.chars = 1, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::reportAssertions(collection = errorMessages) + + if (is.null(connection)) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + } + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " DELETE + FROM {@cohort_database_schema != ''} ? {@cohort_database_schema.@cohort_table} : {@cohort_table} + WHERE cohort_definition_id IN (@cohort_ids);", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + cohort_database_schema = cohortDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohort_table = cohortTable, + cohort_ids = cohortIds + ) +} diff --git a/R/EraFyCohort.R b/R/EraFyCohort.R new file mode 100644 index 0000000..58c8942 --- /dev/null +++ b/R/EraFyCohort.R @@ -0,0 +1,275 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Era-fy cohort(s) +#' +#' @description +#' Given a table with cohort_definition_id, subject_id, cohort_start_date, +#' cohort_end_date execute era logic. This will delete and replace the +#' original rows with the cohort_definition_id(s). edit privileges +#' to the cohort table is required. +#' +#' @template ConnectionDetails +#' +#' @template Connection +#' +#' @template CohortTable +#' +#' @template OldToNewCohortId +#' +#' @template PurgeConflicts +#' +#' @template TempEmulationSchema +#' +#' @param eraconstructorpad Optional value to pad cohort era construction logic. Default = 0. i.e. no padding. +#' +#' @return +#' NULL +#' +#' @export +eraFyCohorts <- function(connectionDetails = NULL, + connection = NULL, + cohortDatabaseSchema = NULL, + cohortTable = "cohort", + oldToNewCohortId, + eraconstructorpad = 0, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema"), + purgeConflicts = FALSE) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertDataFrame( + x = oldToNewCohortId, + min.rows = 1, + add = errorMessages + ) + checkmate::assertNames( + x = colnames(oldToNewCohortId), + must.include = c("oldCohortId", "newCohortId"), + add = errorMessages + ) + checkmate::assertIntegerish( + x = oldToNewCohortId$oldCohortId, + min.len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertIntegerish( + x = oldToNewCohortId$newCohortId, + min.len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortDatabaseSchema, + min.chars = 1, + len = 1, + null.ok = TRUE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortTable, + min.chars = 1, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertLogical( + x = purgeConflicts, + any.missing = FALSE, + min.len = 1, + add = errorMessages + ) + checkmate::reportAssertions(collection = errorMessages) + + if (is.null(connection)) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + } + + cohortIdsInCohortTable <- + getCohortIdsInCohortTable( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + tempEmulationSchema = tempEmulationSchema + ) + + conflicitingCohortIdsInTargetCohortTable <- + intersect( + x = oldToNewCohortId$newCohortId %>% unique(), + y = cohortIdsInCohortTable %>% unique() + ) + + performPurgeConflicts <- FALSE + if (length(conflicitingCohortIdsInTargetCohortTable) > 0) { + if (purgeConflicts) { + performPurgeConflicts <- TRUE + } else { + stop( + paste0( + "The following cohortIds already exist in the target cohort table, causing conflicts :", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + } + } + + tempTableName <- generateRandomString() + tempTable1 <- paste0("#", tempTableName, "1") + tempTable2 <- paste0("#", tempTableName, "2") + + copyCohortsToTempTable( + connection = connection, + oldToNewCohortId = oldToNewCohortId, + sourceCohortDatabaseSchema = cohortDatabaseSchema, + sourceCohortTable = cohortTable, + targetCohortTable = tempTable1 + ) + + sqlEraFy <- " DROP TABLE IF EXISTS @temp_table_2; + with cteEndDates (cohort_definition_id, subject_id, cohort_end_date) AS -- the magic + ( + SELECT + cohort_definition_id + , subject_id + , DATEADD(day,-1 * @eraconstructorpad, event_date) as cohort_end_date + FROM + ( + SELECT + cohort_definition_id + , subject_id + , event_date + , event_type + , MAX(start_ordinal) OVER (PARTITION BY cohort_definition_id, subject_id + ORDER BY event_date, event_type, start_ordinal ROWS UNBOUNDED PRECEDING) AS start_ordinal + , ROW_NUMBER() OVER (PARTITION BY cohort_definition_id, subject_id + ORDER BY event_date, event_type, start_ordinal) AS overall_ord + FROM + ( + SELECT + cohort_definition_id + , subject_id + , cohort_start_date AS event_date + , -1 AS event_type + , ROW_NUMBER() OVER (PARTITION BY cohort_definition_id, subject_id ORDER BY cohort_start_date) AS start_ordinal + FROM @temp_table_1 + + UNION ALL + + + SELECT + cohort_definition_id + , subject_id + , DATEADD(day,@eraconstructorpad,cohort_end_date) as cohort_end_date + , 1 AS event_type + , NULL + FROM @temp_table_1 + ) RAWDATA + ) e + WHERE (2 * e.start_ordinal) - e.overall_ord = 0 + ), + cteEnds (cohort_definition_id, subject_id, cohort_start_date, cohort_end_date) AS + ( + SELECT + c. cohort_definition_id + , c.subject_id + , c.cohort_start_date + , MIN(e.cohort_end_date) AS cohort_end_date + FROM @temp_table_1 c + JOIN cteEndDates e ON c.cohort_definition_id = e.cohort_definition_id AND + c.subject_id = e.subject_id AND + e.cohort_end_date >= c.cohort_start_date + GROUP BY c.cohort_definition_id, c.subject_id, c.cohort_start_date + ) + select cohort_definition_id, subject_id, min(cohort_start_date) as cohort_start_date, cohort_end_date + into @temp_table_2 + from cteEnds + group by cohort_definition_id, subject_id, cohort_end_date + ; + " + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = sqlEraFy, + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + tempEmulationSchema = tempEmulationSchema, + eraconstructorpad = eraconstructorpad, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2 + ) + + cohortIdsToDeleteFromSource <- oldToNewCohortId %>% + dplyr::filter(.data$oldCohortId == .data$newCohortId) %>% + dplyr::pull(.data$oldCohortId) + + if (length(cohortIdsToDeleteFromSource) > 0) { + ParallelLogger::logTrace( + paste0( + "The following cohortIds will be deleted from your cohort table and \n", + " replaced with ear fy'd version of those cohorts using the same original cohort id: ", + paste0(cohortIdsToDeleteFromSource, collapse = ",") + ) + ) + deleteCohortRecords( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + cohortIds = cohortIdsToDeleteFromSource + ) + } + + if (performPurgeConflicts) { + ParallelLogger::logTrace( + paste0( + "The following conflicting cohortIds will be deleted from your cohort table \n", + " as part resolving conflicts: ", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + deleteCohortRecords( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + cohortIds = conflicitingCohortIdsInTargetCohortTable + ) + } + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " INSERT INTO {@cohort_database_schema != ''} ? {@cohort_database_schema.@cohort_table} : {@cohort_table} + SELECT cohort_definition_id, subject_id, cohort_start_date, cohort_end_date + FROM @temp_table_2;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + cohort_database_schema = cohortDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohort_table = cohortTable, + temp_table_2 = tempTable2 + ) + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " DROP TABLE IF EXISTS @temp_table_1; + DROP TABLE IF EXISTS @temp_table_2; + DROP TABLE IF EXISTS #old_to_new_cohort_id;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2 + ) +} diff --git a/R/GetCohortIdsInCohortTable.R b/R/GetCohortIdsInCohortTable.R new file mode 100644 index 0000000..edaf363 --- /dev/null +++ b/R/GetCohortIdsInCohortTable.R @@ -0,0 +1,49 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Get cohort ids in table +#' +#' @description +#' Get cohort ids in table +#' +#' @template Connection +#' +#' @template CohortTable +#' +#' @template TempEmulationSchema +#' +#' @return +#' NULL +#' +getCohortIdsInCohortTable <- function(connection = NULL, + cohortDatabaseSchema = NULL, + cohortTable, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { + cohortIdsInCohortTable <- + DatabaseConnector::renderTranslateQuerySql( + connection = connection, + sql = "SELECT DISTINCT cohort_definition_id cohort_definition_id + FROM {@cohort_database_schema != ''} ? {@cohort_database_schema.@cohort_table} : {@cohort_table};", + snakeCaseToCamelCase = TRUE, + cohort_database_schema = cohortDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohort_table = cohortTable + ) %>% + dplyr::pull(.data$cohortDefinitionId) %>% + unique() %>% + sort() + return(cohortIdsInCohortTable) +} diff --git a/R/IntersectCohorts.R b/R/IntersectCohorts.R new file mode 100644 index 0000000..8b2d1ec --- /dev/null +++ b/R/IntersectCohorts.R @@ -0,0 +1,264 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +#' Intersect cohort(s) +#' +#' @description +#' Find the common cohort period for persons present in all the cohorts. Note: if +#' subject is not found in any of the cohorts, then they will not +#' be in the final cohort. +#' +#' @template ConnectionDetails +#' +#' @template Connection +#' +#' @template CohortTable +#' +#' @template CohortIds +#' +#' @template NewCohortId +#' +#' @template PurgeConflicts +#' +#' @template TempEmulationSchema +#' +#' @return +#' NULL +#' +#' @export +intersectCohorts <- function(connectionDetails = NULL, + connection = NULL, + cohortDatabaseSchema = NULL, + cohortTable = "cohort", + cohortIds, + newCohortId, + purgeConflicts = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertIntegerish( + x = cohortIds, + min.len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertIntegerish( + x = newCohortId, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortDatabaseSchema, + min.chars = 1, + len = 1, + null.ok = TRUE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortTable, + min.chars = 1, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertLogical( + x = purgeConflicts, + any.missing = FALSE, + min.len = 1, + add = errorMessages + ) + checkmate::reportAssertions(collection = errorMessages) + + if (is.null(connection)) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + } + + cohortIdsInCohortTable <- + getCohortIdsInCohortTable( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + tempEmulationSchema = tempEmulationSchema + ) + + conflicitingCohortIdsInTargetCohortTable <- + intersect( + x = newCohortId %>% unique(), + y = cohortIdsInCohortTable %>% unique() + ) + + + performPurgeConflicts <- FALSE + if (length(conflicitingCohortIdsInTargetCohortTable) > 0) { + if (purgeConflicts) { + performPurgeConflicts <- TRUE + } else { + stop( + paste0( + "The following cohortIds already exist in the target cohort table, causing conflicts :", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + } + } + + tempTableName <- generateRandomString() + tempTable1 <- paste0("#", tempTableName, "1") + tempTable2 <- paste0("#", tempTableName, "2") + + copyCohortsToTempTable( + connection = connection, + oldToNewCohortId = dplyr::tibble(oldCohortId = cohortIds) %>% + dplyr::mutate(newCohortId = .data$oldCohortId) %>% + dplyr::distinct(), + sourceCohortDatabaseSchema = cohortDatabaseSchema, + sourceCohortTable = cohortTable, + targetCohortTable = tempTable1 + ) + + eraFyCohorts( + connection = connection, + oldToNewCohortId = dplyr::tibble(oldCohortId = cohortIds) %>% + dplyr::mutate(newCohortId = .data$oldCohortId) %>% + dplyr::distinct(), + cohortTable = tempTable1, + purgeConflicts = TRUE + ) + + numberOfCohorts <- length(cohortIds %>% unique()) + + intersectSql <- "DROP TABLE IF EXISTS @temp_table_2; + + WITH cohort_dates + AS ( + SELECT subject_id, + cohort_date, + -- LEAD will ignore values that are same (e.g. if cohort_start_date = cohort_end_date) + ROW_NUMBER() OVER(PARTITION BY subject_id + ORDER BY cohort_date ASC) cohort_date_seq + FROM ( + SELECT subject_id, + cohort_start_date cohort_date + FROM @temp_table_1 + + UNION ALL -- we need all dates, even if duplicates + + SELECT subject_id, + cohort_end_date cohort_date + FROM @temp_table_1 + ) all_dates + ), + candidate_periods + AS ( + SELECT + subject_id, + cohort_date candidate_start_date, + cohort_date_seq, + LEAD(cohort_date, 1) OVER ( + PARTITION BY subject_id ORDER BY cohort_date, cohort_date_seq ASC + ) candidate_end_date + FROM cohort_dates + GROUP BY subject_id, + cohort_date, + cohort_date_seq + ), + candidate_cohort_date + AS ( + SELECT cohort.*, + candidate_start_date, + candidate_end_date + FROM @temp_table_1 cohort + INNER JOIN candidate_periods candidate ON cohort.subject_id = candidate.subject_id + AND candidate_start_date >= cohort_start_date + AND candidate_end_date <= cohort_end_date + ) + SELECT @new_cohort_id cohort_definition_id, + subject_id, + candidate_start_date cohort_start_date, + candidate_end_date cohort_end_date + INTO @temp_table_2 + FROM candidate_cohort_date + GROUP BY subject_id, + candidate_start_date, + candidate_end_date + HAVING COUNT(*) = @number_of_cohorts;" + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = intersectSql, + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + number_of_cohorts = numberOfCohorts, + new_cohort_id = newCohortId, + tempEmulationSchema = tempEmulationSchema, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2 + ) + + suppressMessages( + eraFyCohorts( + connection = connection, + oldToNewCohortId = dplyr::tibble(oldCohortId = newCohortId) %>% + dplyr::mutate(newCohortId = .data$oldCohortId) %>% + dplyr::distinct(), + cohortTable = tempTable2, + purgeConflicts = TRUE + ) + ) + + if (performPurgeConflicts) { + ParallelLogger::logTrace( + paste0( + "The following conflicting cohortIds will be deleted from your cohort table \n", + " as part resolving conflicts: ", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + deleteCohortRecords( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + cohortIds = conflicitingCohortIdsInTargetCohortTable + ) + } + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " INSERT INTO {@cohort_database_schema != ''} ? {@cohort_database_schema.@cohort_table} : {@cohort_table} + SELECT cohort_definition_id, subject_id, cohort_start_date, cohort_end_date + FROM @temp_table_2;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + cohort_database_schema = cohortDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohort_table = cohortTable, + temp_table_2 = tempTable2 + ) + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " DROP TABLE IF EXISTS @temp_table_1; + DROP TABLE IF EXISTS @temp_table_2;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2 + ) +} diff --git a/R/MinusCohorts.R b/R/MinusCohorts.R new file mode 100644 index 0000000..afe0cf8 --- /dev/null +++ b/R/MinusCohorts.R @@ -0,0 +1,310 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +#' Minus cohort(s) +#' +#' @description +#' Given two cohorts, substract (minus) the dates from the first cohort, the +#' dates the subject also had on the second cohort. +#' +#' @template ConnectionDetails +#' +#' @template Connection +#' +#' @template CohortTable +#' +#' @param firstCohortId The cohort id of the cohort from which to substract. +#' +#' @param secondCohortId The cohort id of the cohort that is used to substract. +#' +#' @template NewCohortId +#' +#' @template PurgeConflicts +#' +#' @template TempEmulationSchema +#' +#' @return +#' NULL +#' +#' @export +minusCohorts <- function(connectionDetails = NULL, + connection = NULL, + cohortDatabaseSchema = NULL, + cohortTable = "cohort", + firstCohortId, + secondCohortId, + newCohortId, + purgeConflicts = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema")) { + errorMessages <- checkmate::makeAssertCollection() + checkmate::assertIntegerish( + x = firstCohortId, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertIntegerish( + x = secondCohortId, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertIntegerish( + x = newCohortId, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortDatabaseSchema, + min.chars = 1, + len = 1, + null.ok = TRUE, + add = errorMessages + ) + checkmate::assertCharacter( + x = cohortTable, + min.chars = 1, + len = 1, + null.ok = FALSE, + add = errorMessages + ) + checkmate::assertLogical( + x = purgeConflicts, + any.missing = FALSE, + min.len = 1, + add = errorMessages + ) + checkmate::reportAssertions(collection = errorMessages) + + if (firstCohortId == secondCohortId) { + warning( + "During minus operation, both first and second cohorts have the same cohort id. The result will be a NULL cohort." + ) + } + + if (is.null(connection)) { + connection <- DatabaseConnector::connect(connectionDetails) + on.exit(DatabaseConnector::disconnect(connection)) + } + + cohortIdsInCohortTable <- + getCohortIdsInCohortTable( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + tempEmulationSchema = tempEmulationSchema + ) + + conflicitingCohortIdsInTargetCohortTable <- + intersect( + x = newCohortId %>% unique(), + y = cohortIdsInCohortTable %>% unique() + ) + + performPurgeConflicts <- FALSE + if (length(conflicitingCohortIdsInTargetCohortTable) > 0) { + if (purgeConflicts) { + performPurgeConflicts <- TRUE + } else { + stop( + paste0( + "The following cohortIds already exist in the target cohort table, causing conflicts :", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + } + } + + tempTableName <- generateRandomString() + tempTable1 <- paste0("#", tempTableName, "1") + tempTable2 <- paste0("#", tempTableName, "2") + tempTable3 <- paste0("#", tempTableName, "3") + + copyCohortsToTempTable( + connection = connection, + oldToNewCohortId = dplyr::tibble(oldCohortId = c(firstCohortId, secondCohortId)) %>% + dplyr::mutate(newCohortId = .data$oldCohortId) %>% + dplyr::distinct(), + sourceCohortDatabaseSchema = cohortDatabaseSchema, + sourceCohortTable = cohortTable, + targetCohortTable = tempTable1 + ) + + intersectCohorts( + connection = connection, + cohortTable = tempTable1, + cohortIds = c(firstCohortId, secondCohortId), + newCohortId = -999, + purgeConflicts = FALSE, + tempEmulationSchema = getOption("sqlRenderTempEmulationSchema") + ) + + minusSql <- "DROP TABLE IF EXISTS @temp_table_2; + + WITH cohort_dates + AS ( + SELECT subject_id, + cohort_date, + -- LEAD will ignore values that are same (e.g. if cohort_start_date = cohort_end_date) + ROW_NUMBER() OVER(PARTITION BY subject_id + ORDER BY cohort_date ASC) cohort_date_seq + FROM ( + SELECT subject_id, + cohort_start_date cohort_date + FROM @temp_table_1 + WHERE cohort_definition_id IN (@first_cohort_id, -999) + + UNION ALL -- we need all dates, even if duplicates + + SELECT subject_id, + cohort_end_date cohort_date + FROM @temp_table_1 + WHERE cohort_definition_id IN (@first_cohort_id, -999) + ) all_dates + ), + candidate_periods + AS ( + SELECT + subject_id, + cohort_date candidate_start_date, + cohort_date_seq, + LEAD(cohort_date, 1) OVER ( + PARTITION BY subject_id ORDER BY cohort_date, cohort_date_seq ASC + ) candidate_end_date + FROM cohort_dates + GROUP BY subject_id, + cohort_date, + cohort_date_seq + ), + candidate_cohort_date + AS ( + SELECT cohort.*, + candidate_start_date, + candidate_end_date + FROM @temp_table_1 cohort + INNER JOIN candidate_periods candidate ON cohort.subject_id = candidate.subject_id + AND candidate_start_date >= cohort_start_date + AND candidate_end_date <= cohort_end_date + ) + SELECT + subject_id, + candidate_start_date, + candidate_end_date + INTO @temp_table_2 + FROM candidate_cohort_date + GROUP BY subject_id, + candidate_start_date, + candidate_end_date + HAVING COUNT(*) = 1;" + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = minusSql, + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + first_cohort_id = firstCohortId, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2, + tempEmulationSchema = tempEmulationSchema + ) + + # date corrections + dateCorrectionSql <- " + DROP TABLE IF EXISTS @temp_table_3; + WITH intersect_cohort + AS ( + SELECT subject_id, + cohort_start_date, + cohort_end_date + FROM @temp_table_1 + WHERE cohort_definition_id IN (- 999) + ) + SELECT @new_cohort_id cohort_definition_id, + mc.subject_id, + CASE + WHEN cs.cohort_end_date IS NULL + THEN mc.candidate_start_date + ELSE DATEADD(DAY, 1, mc.candidate_start_date) + END AS cohort_start_date, + CASE + WHEN ce.cohort_end_date IS NULL + THEN mc.candidate_end_date + ELSE DATEADD(DAY, - 1, mc.candidate_end_date) + END AS cohort_end_date + INTO @temp_table_3 + FROM @temp_table_2 mc + LEFT JOIN intersect_cohort cs ON mc.subject_id = cs.subject_id + AND mc.candidate_start_date = cs.cohort_end_date + LEFT JOIN intersect_cohort ce ON mc.subject_id = ce.subject_id + AND mc.candidate_end_date = ce.cohort_start_date;" + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = dateCorrectionSql, + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2, + temp_table_3 = tempTable3, + new_cohort_id = newCohortId, + tempEmulationSchema = tempEmulationSchema + ) + + if (performPurgeConflicts) { + ParallelLogger::logTrace( + paste0( + "The following conflicting cohortIds will be deleted from your cohort table \n", + " as part resolving conflicts: ", + paste0(conflicitingCohortIdsInTargetCohortTable, collapse = ",") + ) + ) + deleteCohortRecords( + connection = connection, + cohortDatabaseSchema = cohortDatabaseSchema, + cohortTable = cohortTable, + cohortIds = conflicitingCohortIdsInTargetCohortTable + ) + } + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " INSERT INTO {@cohort_database_schema != ''} ? {@cohort_database_schema.@cohort_table} : {@cohort_table} + SELECT cohort_definition_id, subject_id, cohort_start_date, cohort_end_date + FROM @temp_table_3;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + cohort_database_schema = cohortDatabaseSchema, + tempEmulationSchema = tempEmulationSchema, + cohort_table = cohortTable, + temp_table_3 = tempTable3 + ) + + DatabaseConnector::renderTranslateExecuteSql( + connection = connection, + sql = " DROP TABLE IF EXISTS @temp_table_1; + DROP TABLE IF EXISTS @temp_table_2; + DROP TABLE IF EXISTS @temp_table_3;", + profile = FALSE, + progressBar = FALSE, + reportOverallTime = FALSE, + temp_table_1 = tempTable1, + temp_table_2 = tempTable2, + temp_table_3 = tempTable3 + ) +} diff --git a/R/Support.R b/R/Support.R new file mode 100644 index 0000000..b5fbed1 --- /dev/null +++ b/R/Support.R @@ -0,0 +1,28 @@ +# Copyright 2022 Observational Health Data Sciences and Informatics +# +# This file is part of CohortAlgebra +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +generateRandomString <- function() { + randomStringTableName <- + tolower(paste0( + "tmp_", + paste0(sample( + x = c(LETTERS, 0:9), + size = 12, + replace = TRUE + ), collapse = "") + )) + return(randomStringTableName) +} diff --git a/README.md b/README.md index 5cdf0d9..4330da0 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ CohortAlgebra Introduction ============ -THIS PACKAGE IS UNDER ACTIVE DEVELOPMENT. DO NOT USE. +THIS PACKAGE IS UNDER ACTIVE DEVELOPMENT. DO NOT USE. IT IS NOT PART OF HADES. CohortAlgebra is a R package that allows you to create new cohorts from previously instantiated cohorts in a cohort table. New cohorts may be created by performing the union, intersect or minus of one or more cohorts. diff --git a/docs/404.html b/docs/404.html index d080032..14824cb 100644 --- a/docs/404.html +++ b/docs/404.html @@ -98,7 +98,7 @@

Page not found (404)