-
Notifications
You must be signed in to change notification settings - Fork 74
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add loadfilename as column s3 #306
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
# Workflow: Scenario (Load file name as a column from S3) | ||
|
||
## Scenario | ||
|
||
The purpose of this scenario is to capture loaded file name as data from S3 on AWS. | ||
|
||
|
||
*Steps* | ||
1. Obtain file names and file contents from multiple files in a specific bucket of S3. | ||
the file sample | ||
name: files_aaa_20210401 | ||
contents: | ||
``` | ||
aaa | ||
``` | ||
The file name is given a date, like a daily file. The file content is simply a single item. You will prepare several similar files. | ||
|
||
2. Register the file name and the contents of the file in the table. | ||
|
||
In this scenario, the custom scripts are used. Please refer to the Treasure data documentation for custom script. | ||
|
||
- [Custom Scripts](https://docs.treasuredata.com/display/public/PD/Custom+Scripts) | ||
|
||
This scenario is just an example of how to get the file name and file contents using custom script. You don't necessarily have to match the file format to achieve this, but you can use this code as a reference to create your own. | ||
|
||
# How to Run for Server/Client Mode | ||
Here's how to run this sample to try it out. | ||
|
||
Preparation, you have to do the follows. | ||
- Since S3 bucket name is unique, change the bucket name from "td-test-bucket" to an appropriate one. | ||
- create a S3 bucket and create folders "files" and "with_filename". | ||
- Upload the files in the files folder of this sample to the files folder of the S3 you created. | ||
- Change "database_name" in s3_filename_add.dig to an appropriate name. | ||
- Files in the Local folder should be removed before running. | ||
|
||
|
||
First, please upload the workflow. | ||
|
||
# Upload | ||
$ td wf push s3_filename_add | ||
|
||
And Set the S3 Access key ID and Secret access key to the workflow secret as follows. | ||
Access key ID,Secret access key | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line looks like that isn't a sentence. Is this an intentional line? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to add "Access key ID" and "Secret access key" comments, but I forgot to write them. Thank you. |
||
|
||
td wf secrets --project s3_filename_add --set s3.access_key_id=<Access key ID> | ||
td wf secrets --project s3_filename_add --set s3.secret_access_key=<Secret access key> | ||
|
||
You can trigger the session manually to watch it execute. | ||
|
||
# Run | ||
$ td wf start s3_filename_add s3_filename_add --session now | ||
|
||
After executed, the "data.tmp" will be created in the "with_filename" folder on S3 and created td table "master_with_filename" in TD. | ||
|
||
|
||
# Next Step | ||
|
||
If you have any questions, please contact to [email protected]. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
in: | ||
type: s3 | ||
access_key_id: ${secret:s3.access_key_id} | ||
secret_access_key: ${secret:s3.secret_access_key} | ||
bucket: ${s3.bucket} | ||
path_prefix: ${s3.upload_path_prefix}/${s3.datafile} | ||
parser: | ||
charset: UTF-8 | ||
newline: LF | ||
type: csv | ||
delimiter: "," | ||
quote: "\"" | ||
escape: "\"" | ||
trim_if_not_quoted: false | ||
skip_header_lines: 0 | ||
allow_extra_columns: false | ||
allow_optional_columns: false | ||
columns: | ||
- {name: filename, type: string} | ||
- {name: data, type: string} | ||
filters: | ||
- type: add_time | ||
to_column: {name: time} | ||
from_value: {mode: upload_time} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
timezone: Asia/Tokyo | ||
_export: | ||
s3: | ||
bucket: td-test-bucket | ||
path_prefix: files | ||
upload_path_prefix: with_filename | ||
datafile: data.tmp | ||
td: | ||
database: database_name | ||
table: master_with_filename | ||
endpoint: api.treasuredata.com | ||
|
||
+read_s3files: | ||
|
||
py>: s3_filename_add.main_proc | ||
_env: | ||
s3_access_key: ${secret:s3.access_key_id} | ||
s3_secret_key: ${secret:s3.secret_access_key} | ||
s3_bucket: ${s3.bucket} | ||
path_prefix: ${s3.path_prefix} | ||
upload_path_prefix: ${s3.upload_path_prefix} | ||
datafile: ${s3.datafile} | ||
|
||
docker: | ||
image: "digdag/digdag-python:3.9" | ||
|
||
+load_from_s3: | ||
td_load>: load_s3.yml |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import os | ||
import io | ||
|
||
AWS_ACCESS_KEY=os.environ.get('s3_access_key') | ||
AWS_SECRET_ACCESS_KEY=os.environ.get('s3_secret_key') | ||
BUCKET_NAME=os.environ.get('s3_bucket') | ||
PATH_PREFIX=os.environ.get('path_prefix') | ||
UPLOAD_PATH_PREFIX=os.environ.get('upload_path_prefix') | ||
DATAFILE=os.environ.get('datafile') | ||
|
||
|
||
def set_s3_config(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY): | ||
homepath=os.environ.get("HOME"); | ||
configDir=homepath + "/.aws" | ||
configFile="credentials" | ||
credential="[default]\n" + "aws_access_key_id=" + AWS_ACCESS_KEY + "\n" + "aws_secret_access_key=" + AWS_SECRET_ACCESS_KEY + "\n" | ||
print("creating directries " + configDir + " if not exist") | ||
os.makedirs(configDir, exist_ok=True) | ||
file=open(configDir + "/" + configFile, "w") | ||
file.write(credential) | ||
file.close() | ||
|
||
|
||
def get_all_keys(bucket: str=BUCKET_NAME, prefix: str=PATH_PREFIX, keys: []=[], marker: str='') -> [str]: | ||
from boto3 import Session | ||
s3client = Session().client('s3') | ||
""" | ||
See: https://qiita.com/ikai/items/38c52e0c459792a4da46 | ||
""" | ||
response = s3client.list_objects(Bucket=bucket, Prefix=prefix, Marker=marker) | ||
if 'Contents' in response: | ||
keys.extend([content['Key'] for content in response['Contents']]) | ||
if 'IsTruncated' in response: | ||
return get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1]) | ||
return keys | ||
#print(keys) | ||
|
||
def read_files(keys): | ||
import boto3 | ||
s3=boto3.resource('s3') | ||
if os.path.isfile(DATAFILE): | ||
os.remove(DATAFILE) | ||
f = open(DATAFILE, 'a') | ||
for key in keys: | ||
response=s3.Object(BUCKET_NAME,key).get() | ||
body=response['Body'].read() | ||
#return io.StringIO(line.decode('utf-8')) | ||
#print(body.decode('utf-8')) | ||
f.write(key.replace(PATH_PREFIX + '/', '', 1) + ',' + body.decode('utf-8')) | ||
f.close() | ||
bucket=s3.Bucket(BUCKET_NAME) | ||
bucket.upload_file(DATAFILE, UPLOAD_PATH_PREFIX + '/' + DATAFILE) | ||
|
||
|
||
def main_proc(): | ||
os.system("pip install boto3") | ||
#os.system("date") | ||
#os.system("pwd") | ||
#os.system("uname -a") | ||
#os.system("df -h") | ||
set_s3_config(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY) | ||
read_files(get_all_keys()) | ||
os.system('cat data.tmp') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
L10 - L12 letters are in the same line. Is this intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your check! There need to line break there. I add the line break.