Skip to content

Latest commit

 

History

History
259 lines (170 loc) · 8.61 KB

README.md

File metadata and controls

259 lines (170 loc) · 8.61 KB

데이터 시스템 (데이터 웨어하우스, 데이터 레이크, 빅쿼리, Airflow, 데이터셋 관리, 데이터 품질 모니터링, 데이터 워크플로 구성)

학습한 날짜: 240519

데이터 웨어하우스

Untitled

데이터 웨어하우스란 다양한 데이터 소스를 수집, 통합, 적재, 관리하는 시스템이고,

이 수집, 통합, 적재, 관리를 위해서 ETL 같은 파이프라인으로 시스템을 구성한다.

빠르게 대용량 데이터를 분석하는데 특화되어있는 시스템이라고 생각하면 된다. 이 때 분석은 데이터 웨어하우스의 OLAP 쿼리를 사용해서 분석한다.

OLTP 란? OLAP 란?

Untitled

OLAP: 복잡한 쿼리를 통해 다차원으로 데이터를 분석하는 기술.

이 기술은 거대 데이터를 효율적으로 분석하는데 필요한 기술이기 때문에 제품 데이터 또는 로그 데이터 분석에 적합하다.

그러면 데이터 베이스와 데이터 웨어하우스의 차이가 뭐냐?

로그를 데이터 베이스에 저장하고, 이를 분석해서 사용하면 안되냐? 데이터 베이스는 200만개의 데이터(로그 데이터)라고 하자. 이 데이터에 쿼리도 버거워하고, 이걸 join한다고 하면 거의 불가하거나 지연이 발생해서 적절하지 않다.

Untitled

에드혹 쿼리: 즉각적인 응답을 요구하는 쿼리

그러면 데이터 레이크와 데이터 웨어하우스의 차이가 뭐냐?

  1. 데이터 레이크
    • 비정형 데이터, 반정형 데이터, 정형 데이터 드으이 다양한 형태의 대량 데이터를 보관하기 위한 목적의 저장소.
    • 저장소의 특성상 원시 데이터와 비가공 데이터 모두를 수집받을 수 있어서 수집을 위한 형태 제약이 없다. 그냥 아뭍따 저장해놓고 보자 느낌이네..
    • 이런 특성으로 가급적 원본 데이터를 세부적으로 분석해야할 때, 이로부터 인사이트를 얻을 수 있다.

Untitled
Untitled

데이터 웨어하우스의 장/단점

장점

  1. 거대 데티어 분석에 특화된 구조와 다차원 분석 및 칼럼 특화 조회를 지원해서 복잡 쿼리 도 빠른 응답 시간으로 조회 가능.
  2. 데이터 ETL, 파이프라인, BI 도구 등 다양한 생태계에 호환되는 구조로 연동이 쉽다.
  3. SQL기반으로 정형 데이터를 분석하므로 쉬운 사용성

단점

  1. 높은 비용
  2. 구축 난이도
  3. ETL 구성 비용이 높다.
  4. 비정형 데이터를 미지원 <- 우리 같은 의료 이미지 데이터에서는 ㅠㅠ 데이터 레이크의 구성이 필요.

데이터 웨어하우스의 종류

  1. Google BigQuery
  2. AWS Redshift
  3. Databricks
  4. Snowflake

Google BigQuery 사용해보기

Untitled

특징

  1. 콘솔에 들어가면 빅쿼리 바로 볼 수 있는 거처럼. Fully-managed 서버리스 구조로 서버 스펙이나 서버 상태를 관리할 필요 없이 쿼리에 집중 가능

  2. 사용한 만큼의 금액만큼만 지불하며 사용의기준은 스캔된 데이터의 용량이다.

  3. 정형, 반정형 데이터 적재 및 조회에 적합하다.

  4. 스트리밍 데이터 수집 기능을 제공하기 때문에 데이터 적재 및 분석이 가능하다. ⭐️

    Untitled

    Untitled

  5. Vertex AI 및 BQML 기능을 통한 쿼리를 통해 ML 기능으로 확장이 가능하다. ⭐️⭐️

→ 4, 5번 이 Google BigQuery의 가장 큰 특징이다! ⭐️⭐️⭐️

Big Query 데이터 계층

Untitled

사용해보기

순서

  1. GCP 프로젝트 설정
  2. BigQuery 초기 구성
  3. 위키피디아 데이터를 적재 해 볼 것임 (huggingface 에 이 데이터 있음)
  4. 적재한 위키피디아 데이터에 쿼리를 날려서 다차원 분석을 해보자
  5. Big Query에서 실시간 데이터를 처리하고 분석해보자.

1. GCP 프로젝트 설정

Go https://cloud.google.com

Untitled

그 다음 Run a query in BigQuery 클릭하면 빅쿼리 웹 으로 이동함.

2. BigQuery 초기 구성 - Create Project

Untitled

Untitled

3. 위키피디아 데이터를 BigQuery에 적재

이 때 웹 환경이 아니라 SDK를 사용해서 코드 상에서 해볼 것임

→ 그렇기 때문에 GCP Service Account Key 발급 필요.

1) GCP Service Account Key 발급

Untitled

Untitled

Untitled

Untitled

2) BigQuery를 사용할 수 있는 라이브러리 설치

	pip install pandas-gbq # 빅쿼리에 있는 데이터를 pandas의 데이터셋과 호환시켜주는 라이브러리임.

3) 위키피디아 데이터를 BigQuery에 적재

https://huggingface.co/datasets/wikipedia 여기 가보면

https://huggingface.co/datasets/wikipedia#dataset-card-for-wikipedia 여기에 어떻게 데이터를 불러오는지 적혀있고,

그걸 참고해서 적재해보면, 아래와 같은 코드가 된다.

from datasets import load_dataset
from google.oauth2 import service_account
import pandas_gbq
import pandas as pd

credentials = service_account.Credentials.from_service_account_file(
	# 여기 1) 단계에서 생성한 credentials json file 위치를 넣는다.
)

dataset = load_dataset("wikipedia", language="en", data="20220301")
df = pd.DataFrame(dataset["train"][:100000])
pandas_gbq.to_gbq(
	df,
	"introduction.wikipedia",
	project_id="hjchung-machine-learning", #2. BigQuery 초기 구성 - Create Project 단계에서 만든 프로젝트 이름
	credentials=credentials
)

그리고 이게 실행되는 동안 BigQuery에서 Create dataset을 눌러서 introduction을 생성해준다.

Untitled

4. 적재한 위키피디아 데이터에 쿼리를 날려서 다차원 분석

Untitled

WITH KoreaTexts AS (
	SELECT
		text,
		REGEXP_CONTAINS(LOWER(text), LOWER('Korea')) AS ContainsKorea,
		REGEXP_CONTAINS(text, '[가-힣]+') AS ContainsHangul
	FROM
		'hjchung-machine-learning.introduction.wikipedia'
)

SELECT
		CONCAT(
			ROUNT((SUM(CASE WHEN ContainsHangul THEN 1 ELSE 0 END) / COUNT(*)) * 100, 2),
			'%'
		) AS PercentageOfHangulInKoreaTexts
FROM
	KoreaTexts
WHERE
	ContainsKorea

Untitled

BigQuery에서 실시간 데이터를 처리하고 분석

from google.cloud import bigquery
import uuid
import time
import random
import string


def generate_random_text(length: int = 10) -> str:
    letters = string.ascii_letters + string.digit
    return "".join(random.choice(letters) for i in range(length))


credentials = service_account.Credentials.from_service_account_json(
    # 여기 1) 단계에서 생성한 credentials json file 위치를 넣는다.
)
table_id = "hjchung-machine-learning.introduction.streaming"

try:
    credentials.get_table(table_id)
    print("Table {} already exits.".format(table_id))
except:
    schema = [
        bigquery.SchemaField(name="log_id", field_type="STRING"),
        bigquery.SchemaField(name="text", field_type="STRING"),
        bigquery.SchemaField(name="date", field_type="STRING"),
    ]
    table = bigquery.Table(table_id, schema=schema)
    table = credentials.create_table(table)
    print(
        "Create table {}.{}.{}".format(
            table.project, table.dataset_id, table.table_id)
    )


# 랜덤하게 계속 line을 빅쿼리에 추가해보자.
def insert_new_line() -> None:
    rows_to_insert = (
        {
            "log_id": str(uuid.uuid4()),
            "text": generate_random_text(50),
            "date": int(time.time())
        }
    )


for _ in range(100000):
    insert_new_line()

Untitled

실시간으로 추가된 걸 확인할 수 있다.

BigQuery에서 실시간 데이터를 처리는 어디에 활용할 수 있을까?⭐️

모델 서빙에서 발생한 로그 데이터를 BigQuery 스트리밍 적재하여

⭐️⭐️⭐️model drift 방지를 위한 지속적 학습(Continuous Training; CT) 마련 가능. ⭐️⭐️⭐️

Untitled

여기에 스트리밍 API를 바로 쏘기보단 kafka같은 메세지 브로커를 쓰긴 함.