Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add loadfilename as column s3 #306

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions scenarios/Loadfilename_as_column_s3/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Workflow: Scenario (Load file name as a column from S3)

## Scenario

The purpose of this scenario is to capture loaded file name as data from S3 on AWS.


*Steps*
1. Obtain file names and file contents from multiple files in a specific bucket of S3.
the file sample
name: files_aaa_20210401
contents:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

L10 - L12 letters are in the same line. Is this intentional?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your check! There need to line break there. I add the line break.

```
aaa
```
The file name is given a date, like a daily file. The file content is simply a single item. You will prepare several similar files.

2. Register the file name and the contents of the file in the table.

In this scenario, the custom scripts are used. Please refer to the Treasure data documentation for custom script.

- [Custom Scripts](https://docs.treasuredata.com/display/public/PD/Custom+Scripts)

This scenario is just an example of how to get the file name and file contents using custom script. You don't necessarily have to match the file format to achieve this, but you can use this code as a reference to create your own.

# How to Run for Server/Client Mode
Here's how to run this sample to try it out.

Preparation, you have to do the follows.
- Since S3 bucket name is unique, change the bucket name from "td-test-bucket" to an appropriate one.
- create a S3 bucket and create folders "files" and "with_filename".
- Upload the files in the files folder of this sample to the files folder of the S3 you created.
- Change "database_name" in s3_filename_add.dig to an appropriate name.
- Files in the Local folder should be removed before running.


First, please upload the workflow.

# Upload
$ td wf push s3_filename_add

And Set the S3 Access key ID and Secret access key to the workflow secret as follows.
Access key ID,Secret access key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line looks like that isn't a sentence. Is this an intentional line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to add "Access key ID" and "Secret access key" comments, but I forgot to write them. Thank you.


td wf secrets --project s3_filename_add --set s3.access_key_id=<Access key ID>
td wf secrets --project s3_filename_add --set s3.secret_access_key=<Secret access key>

You can trigger the session manually to watch it execute.

# Run
$ td wf start s3_filename_add s3_filename_add --session now

After executed, the "data.tmp" will be created in the "with_filename" folder on S3 and created td table "master_with_filename" in TD.


# Next Step

If you have any questions, please contact to [email protected].
24 changes: 24 additions & 0 deletions scenarios/Loadfilename_as_column_s3/load_s3.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
in:
type: s3
access_key_id: ${secret:s3.access_key_id}
secret_access_key: ${secret:s3.secret_access_key}
bucket: ${s3.bucket}
path_prefix: ${s3.upload_path_prefix}/${s3.datafile}
parser:
charset: UTF-8
newline: LF
type: csv
delimiter: ","
quote: "\""
escape: "\""
trim_if_not_quoted: false
skip_header_lines: 0
allow_extra_columns: false
allow_optional_columns: false
columns:
- {name: filename, type: string}
- {name: data, type: string}
filters:
- type: add_time
to_column: {name: time}
from_value: {mode: upload_time}
28 changes: 28 additions & 0 deletions scenarios/Loadfilename_as_column_s3/s3_filename_add.dig
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
timezone: Asia/Tokyo
_export:
s3:
bucket: td-test-bucket
path_prefix: files
upload_path_prefix: with_filename
datafile: data.tmp
td:
database: database_name
table: master_with_filename
endpoint: api.treasuredata.com

+read_s3files:

py>: s3_filename_add.main_proc
_env:
s3_access_key: ${secret:s3.access_key_id}
s3_secret_key: ${secret:s3.secret_access_key}
s3_bucket: ${s3.bucket}
path_prefix: ${s3.path_prefix}
upload_path_prefix: ${s3.upload_path_prefix}
datafile: ${s3.datafile}

docker:
image: "digdag/digdag-python:3.9"

+load_from_s3:
td_load>: load_s3.yml
63 changes: 63 additions & 0 deletions scenarios/Loadfilename_as_column_s3/s3_filename_add.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import os
import io

AWS_ACCESS_KEY=os.environ.get('s3_access_key')
AWS_SECRET_ACCESS_KEY=os.environ.get('s3_secret_key')
BUCKET_NAME=os.environ.get('s3_bucket')
PATH_PREFIX=os.environ.get('path_prefix')
UPLOAD_PATH_PREFIX=os.environ.get('upload_path_prefix')
DATAFILE=os.environ.get('datafile')


def set_s3_config(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY):
homepath=os.environ.get("HOME");
configDir=homepath + "/.aws"
configFile="credentials"
credential="[default]\n" + "aws_access_key_id=" + AWS_ACCESS_KEY + "\n" + "aws_secret_access_key=" + AWS_SECRET_ACCESS_KEY + "\n"
print("creating directries " + configDir + " if not exist")
os.makedirs(configDir, exist_ok=True)
file=open(configDir + "/" + configFile, "w")
file.write(credential)
file.close()


def get_all_keys(bucket: str=BUCKET_NAME, prefix: str=PATH_PREFIX, keys: []=[], marker: str='') -> [str]:
from boto3 import Session
s3client = Session().client('s3')
"""
See: https://qiita.com/ikai/items/38c52e0c459792a4da46
"""
response = s3client.list_objects(Bucket=bucket, Prefix=prefix, Marker=marker)
if 'Contents' in response:
keys.extend([content['Key'] for content in response['Contents']])
if 'IsTruncated' in response:
return get_all_keys(bucket=bucket, prefix=prefix, keys=keys, marker=keys[-1])
return keys
#print(keys)

def read_files(keys):
import boto3
s3=boto3.resource('s3')
if os.path.isfile(DATAFILE):
os.remove(DATAFILE)
f = open(DATAFILE, 'a')
for key in keys:
response=s3.Object(BUCKET_NAME,key).get()
body=response['Body'].read()
#return io.StringIO(line.decode('utf-8'))
#print(body.decode('utf-8'))
f.write(key.replace(PATH_PREFIX + '/', '', 1) + ',' + body.decode('utf-8'))
f.close()
bucket=s3.Bucket(BUCKET_NAME)
bucket.upload_file(DATAFILE, UPLOAD_PATH_PREFIX + '/' + DATAFILE)


def main_proc():
os.system("pip install boto3")
#os.system("date")
#os.system("pwd")
#os.system("uname -a")
#os.system("df -h")
set_s3_config(AWS_ACCESS_KEY, AWS_SECRET_ACCESS_KEY)
read_files(get_all_keys())
os.system('cat data.tmp')