Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
cryeo committed Sep 7, 2018
0 parents commit 09abf3f
Show file tree
Hide file tree
Showing 20 changed files with 700 additions and 0 deletions.
11 changes: 11 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.python-version
.idea/**
.eggs/
*.egg-info/
*.egg
.tox/
.pytest_cache/
__pycache__/
build/
dist/
*~
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 Chaerim Yeo

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
4 changes: 4 additions & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
include README.md
include LICENSE
include requirements.txt
include tox.ini
37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# redshift-unloader
Unloads the result of a query on Amazon Redshift to local storage.

## Prerequisites
- Python 3.6+
- boto3 1.7.84
- psycopg2 2.7.5
- psycopg2-binary 2.7.5

## Installation
The package is available on PyPI:

```bash
pip install redshift-unloader
```

### Usage
Unloaded data is supposed to be gzipped csv.

```py
from redshift_unloader import RedshiftUnloader

ru = RedshiftUnloader(host='<redshift host>',
port=<redshift port>,
user='<redshift user>',
password='<redshift password>',
database='<redshift database name>',
s3_bucket='<s3 bucket name>',
access_key_id='<aws access key id>',
secret_access_key='<aws secret access key>',
region='<aws region>')

# If you don't need header, set with_header as False
ru.unload(query="SELECT * FROM my_table WHERE log_time >= ''",
filename="/path/to/result.csv.gz",
with_header=True)
```
4 changes: 4 additions & 0 deletions redshift_unloader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from redshift_unloader.redshift_unloader import RedshiftUnloader


__version__ = '0.1.0'
12 changes: 12 additions & 0 deletions redshift_unloader/credential.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import NamedTuple, Dict, Optional


class Credential(NamedTuple):
access_key_id: str
secret_access_key: str

def to_dict(self) -> Dict[str, Optional[str]]:
return {
"ACCESS_KEY_ID": self.access_key_id,
"SECRET_ACCESS_KEY": self.secret_access_key
}
9 changes: 9 additions & 0 deletions redshift_unloader/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import logging

logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s / %(name)s] [%(levelname)s] %(message)s"
)

logger = logging.getLogger("redshift-unloader")
logger.setLevel(logging.DEBUG)
109 changes: 109 additions & 0 deletions redshift_unloader/redshift.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import psycopg2
import psycopg2.extensions
import re

from typing import Dict, List, Optional

from redshift_unloader.credential import Credential
from redshift_unloader.logger import logger


class Redshift:
__credential: Credential
__connection: psycopg2.extensions.connection
__cursor: psycopg2.extensions.cursor

def __init__(self, host: str, port: int, user: str, password: str,
database: str, credential: Credential) -> None:
self.__credential = credential
self.__connection = psycopg2.connect(
host=host,
port=port,
user=user,
password=password,
database=database)
self.__cursor = self.__connection.cursor()

def __del__(self) -> None:
try:
self.__connection.close()
self.__cursor.close()
except:
pass

def get_columns(self, query: str) -> List[str]:
sql = self.__generate_get_columns_sql(query)
logger.debug("query: %s", sql)

try:
self.__cursor.execute(sql)
result = [column.name for column in self.__cursor.description]

return result
except Exception as e:
raise e

def unload(self,
query: str,
s3_uri: str,
manifest: bool = False,
delimiter: Optional[str] = None,
fixed_width: Optional[str] = None,
encrypted: bool = None,
gzip: bool = False,
add_quotes: bool = False,
null_string: Optional[str] = None,
escape: bool = False,
allow_overwrite: bool = False,
parallel: bool = True,
max_file_size: Optional[str] = None) -> bool:
options: Dict[str, Optional[str]] = {}

if manifest:
options['MANIFEST'] = None
if delimiter is not None:
options['DELIMITER'] = f"'{delimiter}'"
if fixed_width is not None:
options['FIXEDWIDTH'] = f"'fixed_width'"
if encrypted:
options['ENCRYPTED'] = None
if gzip:
options['GZIP'] = None
if add_quotes:
options['ADDQUOTES'] = None
if null_string is not None:
options['NULL'] = f"'{null_string}'"
if escape:
options['ESCAPE'] = None
if allow_overwrite:
options['ALLOWOVERWRITE'] = None
options['PARALLEL'] = 'ON' if parallel else 'OFF'
if max_file_size is not None:
options['MAXFILESIZE'] = max_file_size

sql = self.__generate_unload_sql(self.__escaped_query(query), s3_uri, self.__credential, options)
logger.debug("query: %s", sql)

try:
self.__cursor.execute(sql)
self.__connection.commit()

return True
except Exception as e:
raise e

@staticmethod
def __escaped_query(query: str) -> str:
return re.sub(r'[\\\']', lambda x: '\\' + x.group(), query)

@staticmethod
def __generate_get_columns_sql(query: str) -> str:
return f'WITH query AS ({query}) SELECT * FROM query LIMIT 0'

@staticmethod
def __generate_unload_sql(query: str, s3_uri: str, credential: Credential, options: Dict) -> str:
partial_sqls = [f"UNLOAD ('{query}') TO '{s3_uri}'"]
partial_sqls.extend([f"{k} '{v}'" for (k, v) in credential.to_dict().items()])
partial_sqls.extend([f"{k} {v}" if v is not None else f"{k}" for (k, v) in options.items()])

return ' '.join(partial_sqls)
91 changes: 91 additions & 0 deletions redshift_unloader/redshift_unloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import os
import shutil
import tempfile
import uuid
import gzip
import io

from redshift_unloader.credential import Credential
from redshift_unloader.redshift import Redshift
from redshift_unloader.s3 import S3
from redshift_unloader.logger import logger

KB = 1024
MB = 1024 * 1024


class RedshiftUnloader:
__redshift: Redshift
__s3: S3
__credential: Credential

def __init__(self, host: str, port: int, user: str, password: str,
database: str, s3_bucket: str, access_key_id: str,
secret_access_key: str, region: str) -> None:
credential = Credential(
access_key_id=access_key_id, secret_access_key=secret_access_key)
self.__redshift = Redshift(
host=host,
port=port,
user=user,
password=password,
database=database,
credential=credential)
self.__s3 = S3(credential=credential, bucket=s3_bucket, region=region)

def unload(self, query: str, filename: str, with_header: bool = True) -> None:
session_id = self.__generate_session_id()
logger.debug("Session id: %s", session_id)

s3_path = self.__generate_path("/tmp/redshift-unloader", session_id, '/')
local_path = self.__generate_path(tempfile.gettempdir(), session_id)

logger.debug("Get columns")
columns = self.__redshift.get_columns(query) if with_header else None

logger.debug("Unload")
self.__redshift.unload(
query,
self.__s3.uri(s3_path),
gzip=True,
parallel=True,
delimiter=',',
null_string='',
add_quotes=True,
allow_overwrite=True)

logger.debug("Fetch the list of objects")
s3_keys = self.__s3.list(s3_path.lstrip('/'))
local_files = list(map(lambda key: os.path.join(local_path, os.path.basename(key)), s3_keys))

logger.debug("Create temporary directory: %s", local_path)
os.mkdir(local_path, 0o700)

logger.debug("Download all objects")
for s3_key, local_file in zip(s3_keys, local_files):
self.__s3.download(key=s3_key, filename=local_file)

logger.debug("Merge all objects")
with open(filename, 'wb') as out:
if columns is not None:
out.write(gzip.compress((','.join(columns) + os.linesep).encode()))

for local_file in local_files:
logger.debug("Merge %s into result file", local_file)

with open(local_file, 'rb') as read:
shutil.copyfileobj(read, out, 2 * MB)

logger.debug("Remove all objects in S3")
self.__s3.delete(s3_keys)

logger.debug("Remove temporary directory in local")
shutil.rmtree(local_path)

@staticmethod
def __generate_session_id() -> str:
return str(uuid.uuid4())

@staticmethod
def __generate_path(prefix: str, session_id: str, suffix: str = '') -> str:
return ''.join([os.path.join(prefix, session_id), suffix])
44 changes: 44 additions & 0 deletions redshift_unloader/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import boto3
import boto3.resources
import urllib.parse

from typing import List

from redshift_unloader.credential import Credential
from redshift_unloader.logger import logger

MAX_DELETE_OBJECTS = 1000


class S3:
__session: boto3.session.Session
__s3: 'boto3.resources.factory.s3.ServiceResource'
__bucket: 'boto3.resources.factory.s3.Bucket'

def __init__(self, credential: Credential, bucket: str, region: str) -> None:
self.__session = boto3.session.Session(
aws_access_key_id=credential.access_key_id,
aws_secret_access_key=credential.secret_access_key,
region_name=region
)
self.__s3 = self.__session.resource('s3')
self.__bucket = self.__s3.Bucket(bucket)

def __del__(self) -> None:
pass

def uri(self, path: str) -> str:
return urllib.parse.urlunparse(['s3', self.__bucket.name, path, None, None, None])

def list(self, path: str) -> List[str]:
return [obj.key for obj in self.__bucket.objects.filter(Prefix=path)]

def delete(self, keys: List[str]) -> None:
logger.debug("Remove %s object(s) from S3", len(keys))
for i in range(0, len(keys), MAX_DELETE_OBJECTS):
sliced_keys = keys[i:i + MAX_DELETE_OBJECTS]
self.__bucket.delete_objects(Delete={'Objects': [{'Key': key} for key in sliced_keys]})

def download(self, key: str, filename: str) -> None:
logger.debug("Download %s to %s", key, filename)
self.__bucket.download_file(Key=key, Filename=filename)
2 changes: 2 additions & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
moto==1.3.5
pytest==3.7.4
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3==1.7.84
psycopg2==2.7.5
psycopg2-binary==2.7.5
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[bdist_wheel]
python-tag = py36
32 changes: 32 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from setuptools import find_packages, setup

import redshift_unloader


requirements = [name.strip() for name in open('requirements.txt').readlines()]
test_requirements = [name.strip() for name in open('requirements-test.txt').readlines()]
readme = open('README.md', 'r').read()


setup(
name='redshift-unloader',
version=redshift_unloader.__version__,
description='Unload utility for Amazon Redshift',
long_description=readme,
author='Chaerim Yeo',
author_email='[email protected]',
url='https://github.com/cryeo/redshift-unloader',
license='MIT License',
packages=find_packages(exclude=('tests', 'docs')),
install_requires=requirements,
tests_require=test_requirements,
python_requires='>=3.6',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.6',
'Topic :: Software Development :: Libraries :: Python Modules',
],
)
Empty file added tests/__init__.py
Empty file.
Loading

0 comments on commit 09abf3f

Please sign in to comment.