Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Splitting up connector code and tests #5466

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/fides/api/service/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
from fides.api.models.connectionconfig import ConnectionConfig as ConnectionConfig
from fides.api.models.connectionconfig import ConnectionType as ConnectionType
from fides.api.service.connectors.base_connector import BaseConnector as BaseConnector
from fides.api.service.connectors.bigquery_connector import (
BigQueryConnector as BigQueryConnector,
)
from fides.api.service.connectors.consent_email_connector import (
GenericConsentEmailConnector,
)
Expand All @@ -29,47 +32,46 @@
from fides.api.service.connectors.fides_connector import (
FidesConnector as FidesConnector,
)
from fides.api.service.connectors.google_cloud_mysql_connector import (
GoogleCloudSQLMySQLConnector as GoogleCloudSQLMySQLConnector,
)
from fides.api.service.connectors.google_cloud_postgres_connector import (
GoogleCloudSQLPostgresConnector as GoogleCloudSQLPostgresConnector,
)
from fides.api.service.connectors.http_connector import HTTPSConnector as HTTPSConnector
from fides.api.service.connectors.manual_webhook_connector import (
ManualWebhookConnector as ManualWebhookConnector,
)
from fides.api.service.connectors.mariadb_connector import (
MariaDBConnector as MariaDBConnector,
)
from fides.api.service.connectors.microsoft_sql_server_connector import (
MicrosoftSQLServerConnector as MicrosoftSQLServerConnector,
)
from fides.api.service.connectors.mongodb_connector import (
MongoDBConnector as MongoDBConnector,
)
from fides.api.service.connectors.mysql_connector import (
MySQLConnector as MySQLConnector,
)
from fides.api.service.connectors.postgres_connector import (
PostgreSQLConnector as PostgreSQLConnector,
)
from fides.api.service.connectors.rds_mysql_connector import (
RDSMySQLConnector as RDSMySQLConnector,
)
from fides.api.service.connectors.rds_postgres_connector import (
RDSPostgresConnector as RDSPostgresConnector,
)
from fides.api.service.connectors.redshift_connector import (
RedshiftConnector as RedshiftConnector,
)
from fides.api.service.connectors.s3_connector import S3Connector
from fides.api.service.connectors.saas_connector import SaaSConnector as SaaSConnector
from fides.api.service.connectors.scylla_connector import (
ScyllaConnector as ScyllaConnector,
)
from fides.api.service.connectors.sql_connector import (
BigQueryConnector as BigQueryConnector,
)
from fides.api.service.connectors.sql_connector import (
GoogleCloudSQLMySQLConnector as GoogleCloudSQLMySQLConnector,
)
from fides.api.service.connectors.sql_connector import (
GoogleCloudSQLPostgresConnector as GoogleCloudSQLPostgresConnector,
)
from fides.api.service.connectors.sql_connector import (
MariaDBConnector as MariaDBConnector,
)
from fides.api.service.connectors.sql_connector import (
MicrosoftSQLServerConnector as MicrosoftSQLServerConnector,
)
from fides.api.service.connectors.sql_connector import MySQLConnector as MySQLConnector
from fides.api.service.connectors.sql_connector import (
PostgreSQLConnector as PostgreSQLConnector,
)
from fides.api.service.connectors.sql_connector import (
RedshiftConnector as RedshiftConnector,
)
from fides.api.service.connectors.sql_connector import (
from fides.api.service.connectors.snowflake_connector import (
SnowflakeConnector as SnowflakeConnector,
)
from fides.api.service.connectors.timescale_connector import (
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/service/connectors/base_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fides.api.models.connectionconfig import ConnectionConfig, ConnectionTestStatus
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest, RequestTask
from fides.api.service.connectors.query_config import QueryConfig
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.collection_util import Row
from fides.config import CONFIG

Expand Down
158 changes: 158 additions & 0 deletions src/fides/api/service/connectors/bigquery_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
from typing import List, Optional

from loguru import logger
from sqlalchemy import text
from sqlalchemy.engine import ( # type: ignore
Connection,
Engine,
LegacyCursorResult,
create_engine,
)
from sqlalchemy.orm import Session
from sqlalchemy.sql import Executable # type: ignore
from sqlalchemy.sql.elements import TextClause

from fides.api.common_exceptions import ConnectionException
from fides.api.graph.execution import ExecutionNode
from fides.api.models.connectionconfig import ConnectionTestStatus
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest, RequestTask
from fides.api.schemas.connection_configuration.connection_secrets_bigquery import (
BigQuerySchema,
)
from fides.api.service.connectors.query_configs.bigquery_query_config import (
BigQueryQueryConfig,
)
from fides.api.service.connectors.query_configs.query_config import SQLQueryConfig
from fides.api.service.connectors.sql_connector import SQLConnector
from fides.api.util.collection_util import Row


class BigQueryConnector(SQLConnector):
"""Connector specific to Google BigQuery"""

secrets_schema = BigQuerySchema

# Overrides BaseConnector.build_uri
def build_uri(self) -> str:
"""Build URI of format"""
config = self.secrets_schema(**self.configuration.secrets or {})
dataset = f"/{config.dataset}" if config.dataset else ""
return f"bigquery://{config.keyfile_creds.project_id}{dataset}" # pylint: disable=no-member

# Overrides SQLConnector.create_client
def create_client(self) -> Engine:
"""
Returns a SQLAlchemy Engine that can be used to interact with Google BigQuery.

Overrides to pass in credentials_info
"""
secrets = self.configuration.secrets or {}
uri = secrets.get("url") or self.build_uri()

keyfile_creds = secrets.get("keyfile_creds", {})
credentials_info = dict(keyfile_creds) if keyfile_creds else {}

return create_engine(
uri,
credentials_info=credentials_info,
hide_parameters=self.hide_parameters,
echo=not self.hide_parameters,
)

# Overrides SQLConnector.query_config
def query_config(self, node: ExecutionNode) -> BigQueryQueryConfig:
"""Query wrapper corresponding to the input execution_node."""

db: Session = Session.object_session(self.configuration)
return BigQueryQueryConfig(
node, SQLConnector.get_namespace_meta(db, node.address.dataset)
)

def partitioned_retrieval(
self,
query_config: SQLQueryConfig,
connection: Connection,
stmt: TextClause,
) -> List[Row]:
"""
Retrieve data against a partitioned table using the partitioning spec configured for this node to execute
multiple queries against the partitioned table.

This is only supported by the BigQueryConnector currently.

NOTE: when we deprecate `where_clause` partitioning in favor of a more proper partitioning DSL,
we should be sure to still support the existing `where_clause` partition definition on
any in-progress DSRs so that they can run through to completion.
"""
if not isinstance(query_config, BigQueryQueryConfig):
raise TypeError(
f"Unexpected query config of type '{type(query_config)}' passed to BigQueryConnector's `partitioned_retrieval`"
)

partition_clauses = query_config.get_partition_clauses()
logger.info(
f"Executing {len(partition_clauses)} partition queries for node '{query_config.node.address}' in DSR execution"
)
rows = []
for partition_clause in partition_clauses:
logger.debug(
f"Executing partition query with partition clause '{partition_clause}'"
)
existing_bind_params = stmt.compile().params
partitioned_stmt = text(f"{stmt} AND ({text(partition_clause)})").params(
existing_bind_params
)
results = connection.execute(partitioned_stmt)
rows.extend(self.cursor_result_to_rows(results))
return rows

# Overrides SQLConnector.test_connection
def test_connection(self) -> Optional[ConnectionTestStatus]:
"""
Overrides SQLConnector.test_connection with a BigQuery-specific connection test.

The connection is tested using the native python client for BigQuery, since that is what's used
by the detection and discovery workflows/codepaths.
TODO: migrate the rest of this class, used for DSR execution, to also make use of the native bigquery client.
"""
try:
bq_schema = BigQuerySchema(**self.configuration.secrets or {})
client = bq_schema.get_client()
all_projects = [project for project in client.list_projects()]
if all_projects:
return ConnectionTestStatus.succeeded
logger.error("No Bigquery Projects found with the provided credentials.")
raise ConnectionException(
"No Bigquery Projects found with the provided credentials."
)
except Exception as e:
logger.exception(f"Error testing connection to remote BigQuery {str(e)}")
raise ConnectionException(f"Connection error: {e}")

def mask_data(
self,
node: ExecutionNode,
policy: Policy,
privacy_request: PrivacyRequest,
request_task: RequestTask,
rows: List[Row],
) -> int:
"""Execute a masking request. Returns the number of records updated or deleted"""
query_config = self.query_config(node)
update_or_delete_ct = 0
client = self.client()
for row in rows:
update_or_delete_stmts: List[Executable] = (
query_config.generate_masking_stmt(
node, row, policy, privacy_request, client
)
)
if update_or_delete_stmts:
with client.connect() as connection:
for update_or_delete_stmt in update_or_delete_stmts:
results: LegacyCursorResult = connection.execute(
update_or_delete_stmt
)
update_or_delete_ct = update_or_delete_ct + results.rowcount
return update_or_delete_ct
5 changes: 4 additions & 1 deletion src/fides/api/service/connectors/dynamodb_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
DynamoDBSchema,
)
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.query_config import DynamoDBQueryConfig, QueryConfig
from fides.api.service.connectors.query_configs.dynamodb_query_config import (
DynamoDBQueryConfig,
)
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.aws_util import get_aws_session
from fides.api.util.collection_util import Row
from fides.api.util.logger import Pii
Expand Down
2 changes: 1 addition & 1 deletion src/fides/api/service/connectors/fides_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from fides.api.schemas.redis_cache import Identity
from fides.api.service.connectors.base_connector import BaseConnector
from fides.api.service.connectors.fides.fides_client import FidesClient
from fides.api.service.connectors.query_config import QueryConfig
from fides.api.service.connectors.query_configs.query_config import QueryConfig
from fides.api.util.collection_util import Row
from fides.api.util.errors import FidesError

Expand Down
56 changes: 56 additions & 0 deletions src/fides/api/service/connectors/google_cloud_mysql_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import List

import pymysql
from google.cloud.sql.connector import Connector
from google.oauth2 import service_account
from sqlalchemy.engine import Engine, LegacyCursorResult, create_engine # type: ignore

from fides.api.schemas.connection_configuration.connection_secrets_google_cloud_sql_mysql import (
GoogleCloudSQLMySQLSchema,
)
from fides.api.service.connectors.sql_connector import SQLConnector
from fides.api.util.collection_util import Row
from fides.config import get_config

CONFIG = get_config()


class GoogleCloudSQLMySQLConnector(SQLConnector):
"""Connector specific to Google Cloud SQL for MySQL"""

secrets_schema = GoogleCloudSQLMySQLSchema

# Overrides SQLConnector.create_client
def create_client(self) -> Engine:
"""Returns a SQLAlchemy Engine that can be used to interact with a database"""

config = self.secrets_schema(**self.configuration.secrets or {})

credentials = service_account.Credentials.from_service_account_info(
dict(config.keyfile_creds)
)

# initialize connector with the loaded credentials
connector = Connector(credentials=credentials)

def getconn() -> pymysql.connections.Connection:
conn: pymysql.connections.Connection = connector.connect(
config.instance_connection_name,
"pymysql",
user=config.db_iam_user,
db=config.dbname,
enable_iam_auth=True,
)
return conn

return create_engine("mysql+pymysql://", creator=getconn)

@staticmethod
def cursor_result_to_rows(results: LegacyCursorResult) -> List[Row]:
"""results to a list of dictionaries"""
return SQLConnector.default_cursor_result_to_rows(results)

def build_uri(self) -> None:
"""
We need to override this method so it is not abstract anymore, and GoogleCloudSQLMySQLConnector is instantiable.
"""
Loading
Loading