Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move Reflections to Rest API Calls and Extend Implementation #256

Merged
merged 7 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,28 @@ curl -s -X PUT "http://localhost:9047/apiv2/source/dbt_test_source" \
-H "Authorization: _dremio$AUTH_TOKEN" \
--data "{\"name\":\"dbt_test_source\",\"config\":{\"credentialType\":\"ACCESS_KEY\",\"accessKey\":\"$MINIO_ROOT_USER\",\"accessSecret\":\"$MINIO_ROOT_PASSWORD\",\"secure\":false,\"externalBucketList\":[],\"enableAsync\":true,\"enableFileStatusCheck\":true,\"rootPath\":\"/\",\"defaultCtasFormat\":\"ICEBERG\",\"propertyList\":[{\"name\":\"fs.s3a.path.style.access\",\"value\":\"true\"},{\"name\":\"fs.s3a.endpoint\",\"value\":\"minio:9000\"},{\"name\":\"dremio.s3.compat\",\"value\":\"true\"}],\"whitelistedBuckets\":[],\"isCachingEnabled\":false,\"maxCacheSpacePct\":100},\"type\":\"S3\",\"metadataPolicy\":{\"deleteUnavailableDatasets\":true,\"autoPromoteDatasets\":false,\"namesRefreshMillis\":3600000,\"datasetDefinitionRefreshAfterMillis\":3600000,\"datasetDefinitionExpireAfterMillis\":10800000,\"authTTLMillis\":86400000,\"updateMode\":\"PREFETCH_QUERIED\"}}"

echo "S3 Source created in Dremio."
echo "S3 Source created in Dremio."

echo "Creating the Samples source in Dremio..."
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples" \
-H "Content-Type: application/json" \
-H "Authorization: _dremio$AUTH_TOKEN" \
--data-raw "{\"name\":\"Samples\",\"config\":{\"externalBucketList\":[\"samples.dremio.com\"],\"credentialType\":\"NONE\",\"secure\":false,\"propertyList\":[]},\"name\":\"Samples\",\"accelerationRefreshPeriod\":3600000,\"accelerationGracePeriod\":10800000,\"accelerationNeverRefresh\":true,\"accelerationNeverExpire\":true,\"accelerationActivePolicyType\":\"PERIOD\",\"accelerationRefreshSchedule\":\"0 0 8 * * *\",\"accelerationRefreshOnDataChanges\":false,\"type\":\"S3\"}"

echo "Samples source created in Dremio."

echo "Formatting SF_incidents2016..."
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/file_format/samples.dremio.com/SF_incidents2016.json" \
-H "Content-Type: application/json" \
-H "Authorization: _dremio$AUTH_TOKEN" \
--data-raw "{\"type\":\"JSON\"}"

echo "SF_incidents2016 formatted in Dremio."

echo "Formatting NYC-taxi-trips..."
curl -s -X PUT "http://localhost:9047/apiv2/source/Samples/folder_format/samples.dremio.com/NYC-taxi-trips" \
-H "Content-Type: application/json" \
-H "Authorization: _dremio$AUTH_TOKEN" \
howareyouman marked this conversation as resolved.
Show resolved Hide resolved
--data-raw "{\"ignoreOtherFileFormats\":false,\"type\":\"Parquet\"}"

echo "NYC-taxi-trips formatted in Dremio."
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ jobs:
- name: Create MinIO bucket
run: bash .github/scripts/create_minio_bucket.sh

- name: Create Dremio S3 Source
run: bash .github/scripts/create_dremio_s3_source.sh
- name: Create and Format Sources
run: bash .github/scripts/create_and_format_sources.sh

- name: Install Dependencies
uses: actions/setup-python@v4
Expand Down
12 changes: 11 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@
## Changes

- Added [DremioRestClient](dbt/adapters/dremio/api/rest/client.py) to isolate all Dremio API calls inside one class
- [#256](https://github.com/dremio/dbt-dremio/pull/256) Reflections are now handled through the Rest API
- Non-admin users are now able to use reflections
- It is now possible to set a custom name for reflections
- If a reflection already exists in the dataset with the same name defined in the model, it will be updated instead of creating a new one
- New `date_dimensions` parameter was added to the reflection materialization, to set fields that have a `DATE` granularity
- Added Distribution Fields under `distribute_by`
- Added partition transformations under `partition_transform`
- Defaults to Original/Identity if not defined
- `year/month/day/hour/bucket(n)/truncate(n)`
- Computations default to `SUM, COUNT` if mapped measure is numeric, `COUNT` if not
- `reflections_enabled` adapter option has been renamed to `reflections_metadata_enabled` (requires user privileges to run in dremio)
- Removing duplicated macros array_append, array_concat as Dremio already has SQL functions analogues.

## Dependency

- [#222](https://github.com/dremio/dbt-dremio/issues/222) Upgrade dbt-core to 1.8.8 and dbt-tests-adapter to 1.8.0
Expand Down
31 changes: 28 additions & 3 deletions dbt/adapters/dremio/api/rest/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
# limitations under the License.



import requests

from dbt.adapters.dremio.api.authentication import DremioPatAuthentication
from dbt.adapters.dremio.api.parameters import Parameters
from dbt.adapters.dremio.api.rest.utils import _post, _get, _delete
from dbt.adapters.dremio.api.rest.utils import _post, _get, _put, _delete
from dbt.adapters.dremio.api.rest.url_builder import UrlBuilder

from dbt.adapters.events.logging import AdapterLogger
Expand Down Expand Up @@ -132,4 +131,30 @@ def delete_catalog(self, cid):
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)
)

def get_reflections(self, dataset_id):
url = UrlBuilder.get_reflection_url(self._parameters, dataset_id)
return _get(
url,
self._parameters.authentication.get_headers(),
ssl_verify=self._parameters.authentication.verify_ssl,
)

def create_reflection(self, payload):
url = UrlBuilder.create_reflection_url(self._parameters)
return _post(
url,
self._parameters.authentication.get_headers(),
json=payload,
ssl_verify=self._parameters.authentication.verify_ssl,
)

def update_reflection(self, reflection_id, payload):
url = UrlBuilder.update_reflection_url(self._parameters, reflection_id)
return _put(
url,
self._parameters.authentication.get_headers(),
json=payload,
ssl_verify=self._parameters.authentication.verify_ssl,
)
Empty file.
141 changes: 141 additions & 0 deletions dbt/adapters/dremio/api/rest/entities/reflection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from enum import Enum


class TransformType(Enum):
YEAR = "YEAR"
MONTH = "MONTH"
DAY = "DAY"
HOUR = "HOUR"
IDENTITY = "IDENTITY"
BUCKET = "BUCKET"
TRUNCATE = "TRUNCATE"

@classmethod
def from_string(cls, transform_str):
transform_str = transform_str.upper()

if transform_str.startswith("BUCKET("):
return cls.BUCKET
elif transform_str.startswith("TRUNCATE("):
return cls.TRUNCATE

try:
return cls(transform_str)
except ValueError:
return cls.IDENTITY

def to_transform(self, raw_str):
if self in (
TransformType.YEAR,
TransformType.MONTH,
TransformType.DAY,
TransformType.HOUR,
TransformType.IDENTITY
):
return {"type": self.value}

if self == TransformType.BUCKET:
bucket_count = int(raw_str.split("(")[1].split(")")[0])
return {
"type": "BUCKET",
"bucketTransform": {"bucketCount": bucket_count},
}

if self == TransformType.TRUNCATE:
truncate_length = int(raw_str.split("(")[1].split(")")[0])
return {
"type": "TRUNCATE",
"truncateTransform": {"truncateLength": truncate_length},
}

return {"type": TransformType.IDENTITY.value}


# https://docs.dremio.com/current/reference/api/reflections/
class ReflectionEntity:
def __init__(self, name, reflection_type, dataset_id, display_fields, dimensions, date_dimensions, measures,
computations, partition_by, partition_transform, partition_method, distribute_by, localsort_by,
arrow_cache):
self.__name = name
self.__type = reflection_type
self.__dataset_id = dataset_id
self.__partition_method = partition_method.upper()
self.__display_fields = display_fields
self.__dimensions_fields = dimensions
self.__date_dimensions_fields = date_dimensions
self.__measure_fields = measures
self.__computation_fields = computations
self.__partition_by_fields = partition_by
self.__partition_transformations = partition_transform
self.__partition_method = partition_method
self.__distribute_by_fields = distribute_by
self.__local_sort_fields = localsort_by
self.__arrow_cache = arrow_cache

def buildDisplayFields(self):
return [{"name": field} for field in self.__display_fields]

def buildDimensionFields(self):
return [{"name": field} for field in self.__dimensions_fields]

def buildDateFields(self):
return [{"name": date_dimension, "granularity": "DATE"} for date_dimension in self.__date_dimensions_fields]

def buildMeasureFields(self):
return [{"name": measure, "measureTypeList": computation.split(',')} for
measure, computation in zip(self.__measure_fields, self.__computation_fields)]

def buildPartitionFields(self):
if not self.__partition_transformations:
self.__partition_transformations = ["IDENTITY"] * len(self.__partition_by_fields)

partition_fields = []

for partition, transform in zip(self.__partition_by_fields, self.__partition_transformations):
transform_type = TransformType.from_string(transform)
partition_fields.append({
"name": partition,
"transform": transform_type.to_transform(transform)
})

return partition_fields

def buildDistributionFields(self):
return [{"name": distribute} for distribute in self.__distribute_by_fields]

def buildSortFields(self):
return [{"name": sort} for sort in self.__local_sort_fields]

def build_payload(self):
payload = {
"type": self.__type,
"name": self.__name,
"datasetId": self.__dataset_id,
"enabled": True,
"arrowCachingEnabled": self.__arrow_cache,
"partitionDistributionStrategy": self.__partition_method.upper(),
"entityType": "reflection"
}

if self.__display_fields:
payload["displayFields"] = self.buildDisplayFields()

if self.__dimensions_fields:
payload["dimensionFields"] = self.buildDimensionFields()

if self.__date_dimensions_fields:
payload["dateFields"] = self.buildDateFields()

if self.__measure_fields and self.__computation_fields:
payload["measureFields"] = self.buildMeasureFields()

if self.__partition_by_fields:
payload["partitionFields"] = self.buildPartitionFields()

if self.__distribute_by_fields:
payload["distributionFields"] = self.buildDistributionFields()

if self.__local_sort_fields:
payload["sortFields"] = self.buildSortFields()

return payload
62 changes: 53 additions & 9 deletions dbt/adapters/dremio/api/rest/url_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ class UrlBuilder:
SOFTWARE_CATALOG_ENDPOINT = "/api/v3/catalog"
CLOUD_CATALOG_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/catalog"

SOFTWARE_REFLECTIONS_ENDPOINT = "/api/v3/reflection"
CLOUD_REFLECTIONS_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/reflection"

SOFTWARE_DATASET_ENDPOIT = "/api/v3/dataset"
CLOUD_DATASET_ENDPOINT = CLOUD_PROJECT_ENDPOINT + "/{}/dataset"

# https://docs.dremio.com/software/rest-api/jobs/get-job/
OFFSET_DEFAULT = 0
LIMIT_DEFAULT = 100
Expand All @@ -56,10 +62,10 @@ def sql_url(cls, parameters: Parameters):
def job_status_url(cls, parameters: Parameters, job_id):
if type(parameters) is CloudParameters:
return (
parameters.base_url
+ UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id)
+ "/"
+ job_id
parameters.base_url
+ UrlBuilder.CLOUD_JOB_ENDPOINT.format(parameters.cloud_project_id)
+ "/"
+ job_id
)
return parameters.base_url + UrlBuilder.SOFTWARE_JOB_ENDPOINT + "/" + job_id

Expand All @@ -75,11 +81,11 @@ def job_cancel_url(cls, parameters: Parameters, job_id):

@classmethod
def job_results_url(
cls,
parameters: Parameters,
job_id,
offset=OFFSET_DEFAULT,
limit=LIMIT_DEFAULT,
cls,
parameters: Parameters,
job_id,
offset=OFFSET_DEFAULT,
limit=LIMIT_DEFAULT,
):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
Expand Down Expand Up @@ -139,3 +145,41 @@ def catalog_item_by_path_url(cls, parameters: Parameters, path_list):
joined_path_str = "/".join(quoted_path_list).replace('"', "")
endpoint = f"/by-path/{joined_path_str}"
return url_path + endpoint

@classmethod
def create_reflection_url(cls, parameters: Parameters):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
parameters.cloud_project_id
)
else:
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT

return url_path

@classmethod
def update_reflection_url(cls, parameters: Parameters, dataset_id):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
url_path += UrlBuilder.CLOUD_REFLECTIONS_ENDPOINT.format(
parameters.cloud_project_id
)
else:
url_path += UrlBuilder.SOFTWARE_REFLECTIONS_ENDPOINT

endpoint = "/{}".format(dataset_id)
return url_path + endpoint

@classmethod
def get_reflection_url(cls, parameters: Parameters, dataset_id):
url_path = parameters.base_url
if type(parameters) is CloudParameters:
url_path += UrlBuilder.CLOUD_DATASET_ENDPOINT.format(
parameters.cloud_project_id
)
else:
url_path += UrlBuilder.SOFTWARE_DATASET_ENDPOIT

endpoint = "/{}/reflection".format(dataset_id)
return url_path + endpoint
22 changes: 13 additions & 9 deletions dbt/adapters/dremio/api/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import json as jsonlib
from requests.exceptions import HTTPError


from dbt.adapters.events.logging import AdapterLogger

logger = AdapterLogger("dremio")
Expand All @@ -45,12 +44,12 @@ def _get(url, request_headers, details="", ssl_verify=True):


def _post(
url,
request_headers=None,
json=None,
details="",
ssl_verify=True,
timeout=None,
url,
request_headers=None,
json=None,
details="",
ssl_verify=True,
timeout=None,
):
if isinstance(json, str):
json = jsonlib.loads(json)
Expand All @@ -64,6 +63,13 @@ def _post(
return _check_error(response, details)


def _put(url, request_headers, json=None, details="", ssl_verify=True):
response = session.put(
url, headers=request_headers, verify=ssl_verify, json=json
)
return _check_error(response, details)


def _delete(url, request_headers, details="", ssl_verify=True):
response = session.delete(url, headers=request_headers, verify=ssl_verify)
return _check_error(response, details)
Expand Down Expand Up @@ -148,5 +154,3 @@ def _check_error(response, details=""):
"Gateway Timeout:" + details, error, response
)
raise DremioException("Unknown error", error)


Loading
Loading