Skip to content

Commit

Permalink
feat: use requests-mock
Browse files Browse the repository at this point in the history
  • Loading branch information
Andreas Gruhler committed Nov 25, 2024
1 parent 907c522 commit 1ee071f
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 204 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@
**.env
**.venv
**__pycache__
**vault_data
5 changes: 5 additions & 0 deletions kubernetes/.pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,8 @@
# https://docs.python.org/3/library/logging.html#levels
log_cli = true
log_cli_level = 20
filterwarnings =
# DeprecationWarning: datetime.datetime.utcnow() is deprecated and scheduled
# for removal in a future version. Use timezone-aware objects to represent
# datetimes in UTC: datetime.datetime.now(datetime.UTC).
ignore:.*datetime.datetime.utcnow().*
4 changes: 2 additions & 2 deletions kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ mc ls --versions my-snapshots/vault-snapshots-2f848f

## Development and tests

Requirements for running the mock server (`vault_server_mock.py`):
* HashiCorp Vault or OpenBao (`vault` binary)
Vault API requests are mocked with
[requests-mock](https://requests-mock.readthedocs.io).

To prepare the environment:
```bash
Expand Down
1 change: 1 addition & 0 deletions kubernetes/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
hvac
boto3
moto[s3]
requests_mock
pytest
coverage
5 changes: 0 additions & 5 deletions kubernetes/vault_config.hcl

This file was deleted.

98 changes: 0 additions & 98 deletions kubernetes/vault_server_mock.py

This file was deleted.

1 change: 1 addition & 0 deletions kubernetes/vault_snapshot/fixtures/jwt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mock_jwt
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import logging
import boto3
from botocore.exceptions import ClientError
from hvac.api.auth_methods import Kubernetes
import hvac
import os
from pathlib import Path
import datetime

class VaultSnapshot:
Expand All @@ -23,65 +23,84 @@ def __init__(self, **kwargs):

# read input keyword arguments
if "vault_addr" in kwargs:
self.vault_addr = kwargs['vault_addr']
self.vault_addr = kwargs["vault_addr"]
elif "VAULT_ADDR" in os.environ:
self.vault_addr = os.environ['VAULT_ADDR']
self.vault_addr = os.environ["VAULT_ADDR"]
else:
raise NameError("VAULT_ADDR undefined")

if "vault_token" in kwargs:
self.vault_token = kwargs['vault_token']
self.vault_token = kwargs["vault_token"]
elif "VAULT_TOKEN" in os.environ:
self.vault_token = os.environ['VAULT_TOKEN']
self.vault_token = os.environ["VAULT_TOKEN"]
elif "vault_role" in kwargs:
self.vault_role = kwargs["vault_role"]
elif "VAULT_ROLE" in os.environ:
self.vault_role = os.environ["VAULT_ROLE"]
else:
raise NameError("VAULT_TOKEN undefined")
raise NameError("No VAULT_TOKEN or VAULT_ROLE")

if "s3_access_key_id" in kwargs:
self.s3_access_key_id = kwargs['s3_access_key_id']
self.s3_access_key_id = kwargs["s3_access_key_id"]
elif "AWS_ACCESS_KEY_ID" in os.environ:
self.s3_access_key_id = os.environ['AWS_ACCESS_KEY_ID']
self.s3_access_key_id = os.environ["AWS_ACCESS_KEY_ID"]
else:
raise NameError("AWS_ACCESS_KEY_ID undefined")

if "s3_secret_access_key" in kwargs:
self.s3_secret_access_key = kwargs['s3_secret_access_key']
self.s3_secret_access_key = kwargs["s3_secret_access_key"]
elif "AWS_SECRET_ACCESS_KEY" in os.environ:
self.s3_secret_access_key = os.environ['AWS_SECRET_ACCESS_KEY']
self.s3_secret_access_key = os.environ["AWS_SECRET_ACCESS_KEY"]
else:
raise NameError("AWS_SECRET_ACCESS_KEY undefined")

if "s3_host" in kwargs:
self.s3_host = kwargs['s3_host']
self.s3_host = kwargs["s3_host"]
elif "S3_HOST" in os.environ:
self.s3_host = os.environ['S3_HOST']
self.s3_host = os.environ["S3_HOST"]
else:
raise NameError("S3_HOST undefined")

if "s3_bucket" in kwargs:
self.s3_bucket = kwargs['s3_bucket']
self.s3_bucket = kwargs["s3_bucket"]
elif "S3_BUCKET" in os.environ:
self.s3_bucket = os.environ['S3_BUCKET']
self.s3_bucket = os.environ["S3_BUCKET"]
else:
raise NameError("S3_BUCKET undefined")

if "jwt_secret_path" in kwargs:
self.jwt_secret_path = kwargs["jwt_secret_path"]
elif "JWT_SECRET_PATH" in os.environ:
self.s3_bucket = os.environ["JWT_SECRET_PATH"]
else:
# default Kubernetes serviceaccount JWT secret path
self.jwt_secret_path = "/var/run/secrets/kubernetes.io/serviceaccount/token"

# Boto S3 client
# * https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html
self.s3_client = boto3.client(service_name='s3',
self.s3_client = boto3.client(service_name="s3",
endpoint_url=self.s3_host,
aws_access_key_id=self.s3_access_key_id,
aws_secret_access_key=self.s3_secret_access_key)

# Authenticate with Kubernetes ServiceAccount if vault_token is empty
# https://hvac.readthedocs.io/en/stable/usage/auth_methods/kubernetes.html
#hvac_client = hvac.Client(url=url, verify=certificate_path)
#f = open('/var/run/secrets/kubernetes.io/serviceaccount/token')
#jwt = f.read()
####VAULT_TOKEN=$(vault write -field=token auth/kubernetes/login role="${VAULT_ROLE}" jwt="${JWT}")
#Kubernetes(hvac_client.adapter).login(role=role, jwt=jwt)

self.logger.info(f"Connecting to Vault API {self.vault_addr}")
self.hvac_client = hvac.Client(url=self.vault_addr)
self.hvac_client.token = self.vault_token

# try setting VAULT_TOKEN if exists
if hasattr(self, "vault_token") and len(self.vault_token) > 0:
self.hvac_client.token = self.vault_token
elif Path(self.jwt_secret_path).exists():
f = open(self.jwt_secret_path)

# Authenticate with Kubernetes ServiceAccount if vault_token is empty
# https://hvac.readthedocs.io/en/stable/usage/auth_methods/kubernetes.html
login_resp = hvac.api.auth_methods.Kubernetes(self.hvac_client.adapter).login(
role=self.vault_role,
jwt=f.read()
)
self.hvac_client.token = login_resp["auth"]["client_token"]
else:
raise Exception("Unable to authenticate with VAULT_TOKEN or JWT")

assert self.hvac_client.is_authenticated()

Expand Down Expand Up @@ -117,7 +136,7 @@ def snapshot(self):

# Iterate and remove expired snapshots:
# https://boto3.amazonaws.com/v1/documentation/api/latest/guide/migrations3.html
s3 = boto3.resource(service_name='s3',
s3 = boto3.resource(service_name="s3",
endpoint_url=self.s3_host,
aws_access_key_id=self.s3_access_key_id,
aws_secret_access_key=self.s3_secret_access_key)
Expand Down
85 changes: 85 additions & 0 deletions kubernetes/vault_snapshot/vault_snapshot_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
import boto3
import pytest
import requests_mock
from moto import mock_aws

from vault_snapshot import VaultSnapshot

logger = logging.getLogger(__name__)

class TestVaultSnapshots:
"""
Test Vault snapshot functionality.
"""

@mock_aws
@requests_mock.Mocker(kw="mock")
def test_snapshot_with_token(self, **kwargs):
"""
Test snapshot functionality with Token auth.
https://docs.getmoto.org/en/latest/docs/getting_started.html#decorator
"""

bucket_name = "vault-snapshots"
region_name = "us-east-1"
conn = boto3.resource("s3", region_name=region_name)
# We need to create the bucket since this is all in Moto's 'virtual' AWS account
conn.create_bucket(Bucket=bucket_name)

kwargs['mock'].get("http://127.0.0.1:8200/v1/auth/token/lookup-self", text="mock")
kwargs['mock'].get("http://127.0.0.1:8200/v1/sys/storage/raft/snapshot", text="blob")

vault_snapshot = VaultSnapshot(
vault_addr="http://127.0.0.1:8200",
vault_token="root",
s3_access_key_id="test",
s3_secret_access_key="test",
s3_host=f"https://s3.{region_name}.amazonaws.com",
s3_bucket=bucket_name
)
file_name = vault_snapshot.snapshot()

s3obj = conn.Object(bucket_name, file_name).get()
body = s3obj["Body"]

assert body.read() == b"blob"

@mock_aws
@requests_mock.Mocker(kw="mock")
def test_snapshot_with_jwt(self, **kwargs):
"""
Test snapshot functionality with JWT auth.
"""

kwargs['mock'].post("http://127.0.0.1:8200/v1/auth/kubernetes/login", json={
"auth": {
"client_token": "root"
}
})
kwargs['mock'].get("http://127.0.0.1:8200/v1/sys/storage/raft/snapshot", text="blob")
kwargs['mock'].get("http://127.0.0.1:8200/v1/auth/token/lookup-self", text="blob")

bucket_name = "vault-snapshots"
region_name = "us-east-1"
conn = boto3.resource("s3", region_name=region_name)
# We need to create the bucket since this is all in Moto's 'virtual' AWS account
conn.create_bucket(Bucket=bucket_name)

vault_snapshot = VaultSnapshot(
vault_addr="http://127.0.0.1:8200",
# the mock server assumes a "default" role
vault_role="default",
jwt_secret_path="./vault_snapshot/fixtures/jwt",
s3_access_key_id="test",
s3_secret_access_key="test",
s3_host=f"https://s3.{region_name}.amazonaws.com",
s3_bucket=bucket_name
)
file_name = vault_snapshot.snapshot()

s3obj = conn.Object(bucket_name, file_name).get()
body = s3obj["Body"]

assert body.read() == b"blob"
Loading

0 comments on commit 1ee071f

Please sign in to comment.