From e8bc6cd23afba2e916cd96ba02e8e920c30712ae Mon Sep 17 00:00:00 2001 From: Hanna Liashchuk Date: Thu, 29 Dec 2022 19:26:31 +0200 Subject: [PATCH] Added server side parameters for thrift connection type --- .changes/unreleased/Features-20221229-200956.yaml | 7 +++++++ dbt/adapters/spark/connections.py | 6 +++++- tests/unit/test_adapter.py | 10 ++++++---- 3 files changed, 18 insertions(+), 5 deletions(-) create mode 100644 .changes/unreleased/Features-20221229-200956.yaml diff --git a/.changes/unreleased/Features-20221229-200956.yaml b/.changes/unreleased/Features-20221229-200956.yaml new file mode 100644 index 000000000..1add9bf72 --- /dev/null +++ b/.changes/unreleased/Features-20221229-200956.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Support server side parameters in thrift connection +time: 2022-12-29T20:09:56.457776+02:00 +custom: + Author: ' hanna-liashchuk' + Issue: "387" + PR: "577" diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index a606beb78..d28eaccc5 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -379,7 +379,10 @@ def open(cls, connection): kerberos_service_name=creds.kerberos_service_name, password=creds.password, ) - conn = hive.connect(thrift_transport=transport) + conn = hive.connect( + thrift_transport=transport, + configuration=creds.server_side_parameters + ) else: conn = hive.connect( host=creds.host, @@ -388,6 +391,7 @@ def open(cls, connection): auth=creds.auth, kerberos_service_name=creds.kerberos_service_name, password=creds.password, + configuration=creds.server_side_parameters ) # noqa handle = PyhiveConnectionWrapper(conn) elif creds.method == SparkConnectionMethod.ODBC: diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index d24bc8a2f..2e63229f3 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -154,13 +154,14 @@ def test_thrift_connection(self): config = self._get_target_thrift(self.project_cfg) adapter = SparkAdapter(config) - def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password): + def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password, configuration): self.assertEqual(host, 'myorg.sparkhost.com') self.assertEqual(port, 10001) self.assertEqual(username, 'dbt') self.assertIsNone(auth) self.assertIsNone(kerberos_service_name) self.assertIsNone(password) + self.assertDictEqual(configuration, {}) with mock.patch.object(hive, 'connect', new=hive_thrift_connect): connection = adapter.acquire_connection('dummy') @@ -175,11 +176,12 @@ def test_thrift_ssl_connection(self): config = self._get_target_use_ssl_thrift(self.project_cfg) adapter = SparkAdapter(config) - def hive_thrift_connect(thrift_transport): + def hive_thrift_connect(thrift_transport, configuration): self.assertIsNotNone(thrift_transport) transport = thrift_transport._trans self.assertEqual(transport.host, 'myorg.sparkhost.com') self.assertEqual(transport.port, 10001) + self.assertDictEqual(configuration, {}) with mock.patch.object(hive, 'connect', new=hive_thrift_connect): connection = adapter.acquire_connection('dummy') @@ -194,13 +196,14 @@ def test_thrift_connection_kerberos(self): config = self._get_target_thrift_kerberos(self.project_cfg) adapter = SparkAdapter(config) - def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password): + def hive_thrift_connect(host, port, username, auth, kerberos_service_name, password, configuration): self.assertEqual(host, 'myorg.sparkhost.com') self.assertEqual(port, 10001) self.assertEqual(username, 'dbt') self.assertEqual(auth, 'KERBEROS') self.assertEqual(kerberos_service_name, 'hive') self.assertIsNone(password) + self.assertDictEqual(configuration, {}) with mock.patch.object(hive, 'connect', new=hive_thrift_connect): connection = adapter.acquire_connection('dummy') @@ -734,4 +737,3 @@ def test_parse_columns_from_information_with_table_type_and_parquet_provider(sel 'stats:rows:label': 'rows', 'stats:rows:value': 12345678 }) -