-
-
Notifications
You must be signed in to change notification settings - Fork 306
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for Name.com #507
base: master
Are you sure you want to change the base?
Changes from 1 commit
766fb98
da2f11f
fc5d8b8
44e824d
c87dd64
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,212 @@ | ||
"""Module provider for Name.com""" | ||
from __future__ import absolute_import | ||
|
||
import logging | ||
|
||
from requests import HTTPError, Session | ||
from requests.auth import HTTPBasicAuth | ||
|
||
from lexicon.providers.base import Provider as BaseProvider | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
NAMESERVER_DOMAINS = ['name.com'] | ||
|
||
DUPLICATE_ERROR = { | ||
'message': 'Invalid Argument', | ||
'details': 'Parameter Value Error - Duplicate Record' | ||
} | ||
|
||
|
||
def provider_parser(subparser): | ||
"""Configure a subparser for Name.com.""" | ||
|
||
subparser.add_argument('--auth-username', help='specify a username') | ||
subparser.add_argument('--auth-token', help='specify an API token') | ||
|
||
|
||
class NamecomLoader(object): # pylint: disable=useless-object-inheritance,too-few-public-methods | ||
"""Loader that handles pagination for the Name.com provider.""" | ||
|
||
def __init__(self, get, url, data_key, next_page=1): | ||
self.get = get | ||
self.url = url | ||
self.data_key = data_key | ||
self.next_page = next_page | ||
|
||
def __iter__(self): | ||
while self.next_page: | ||
response = self.get(self.url, {'page': self.next_page}) | ||
for data in response[self.data_key]: | ||
yield data | ||
self.next_page = response.get('next_page') | ||
|
||
|
||
class NamecomProvider(BaseProvider): | ||
"""Provider implementation for Name.com.""" | ||
|
||
def __init__(self, config): | ||
super(Provider, self).__init__(config) | ||
self.api_endpoint = 'https://api.name.com/v4' | ||
self.session = Session() | ||
|
||
def _authenticate(self): | ||
self.session.auth = HTTPBasicAuth( | ||
username=self._get_provider_option('auth_username'), | ||
password=self._get_provider_option('auth_token') | ||
) | ||
|
||
# checking domain existence | ||
domain_name = self.domain | ||
for domain in NamecomLoader(self._get, '/domains', 'domains'): | ||
if domain['domainName'] == domain_name: | ||
self.domain_id = domain_name | ||
return | ||
|
||
raise Exception('{} domain does not exist'.format(domain_name)) | ||
|
||
def _create_record(self, rtype, name, content): | ||
data = { | ||
'type': rtype, | ||
'host': self._relative_name(name), | ||
'answer': content, | ||
'ttl': self._get_lexicon_option('ttl') | ||
} | ||
|
||
if rtype in ('MX', 'SRV'): | ||
# despite the documentation says a priority is | ||
# required for MX and SRV, it's actually optional | ||
priority = self._get_lexicon_option('priority') | ||
if priority: | ||
data['priority'] = priority | ||
|
||
url = '/domains/{}/records'.format(self.domain) | ||
try: | ||
record_id = self._post(url, data)['id'] | ||
except HTTPError as error: | ||
response = error.response | ||
if response.status_code == 400 and \ | ||
response.json() == DUPLICATE_ERROR: | ||
LOGGER.warning( | ||
'create_record: duplicate record has been skipped' | ||
) | ||
return True | ||
raise | ||
|
||
LOGGER.debug('create_record: record %s has been created', record_id) | ||
|
||
return record_id | ||
|
||
def _list_records(self, rtype=None, name=None, content=None): | ||
url = '/domains/{}/records'.format(self.domain) | ||
records = [] | ||
|
||
for raw in NamecomLoader(self._get, url, 'records'): | ||
record = { | ||
'id': raw['id'], | ||
'type': raw['type'], | ||
'name': raw['fqdn'][:-1], | ||
'ttl': raw['ttl'], | ||
'content': raw['answer'], | ||
} | ||
records.append(record) | ||
|
||
LOGGER.debug('list_records: retrieved %s records', len(records)) | ||
|
||
if rtype: | ||
records = (record for record in records if record['type'] == rtype) | ||
if name: | ||
name = self._full_name(name) | ||
records = (record for record in records if record['name'] == name) | ||
if content: | ||
records = (record for record in records | ||
if record['content'] == content) | ||
|
||
if not isinstance(records, list): | ||
records = list(records) | ||
LOGGER.debug('list_records: filtered %s records', len(records)) | ||
|
||
return records | ||
|
||
def _update_record(self, identifier, rtype=None, name=None, content=None): | ||
if not identifier: | ||
if not (rtype and name): | ||
raise ValueError( | ||
'Record identifier or rtype+name must be specified' | ||
) | ||
records = self._list_records(rtype, name) | ||
if not records: | ||
raise Exception('There is no record to update') | ||
|
||
if len(records) > 1: | ||
filtered_records = [record for record in records | ||
if record['content'] == content] | ||
if filtered_records: | ||
records = filtered_records | ||
|
||
if len(records) > 1: | ||
raise Exception( | ||
'There are multiple records to update: {}'.format( | ||
', '.join(record['id'] for record in records) | ||
) | ||
) | ||
|
||
record_id = records[0]['id'] | ||
else: | ||
record_id = identifier | ||
|
||
data = {'ttl': self._get_lexicon_option('ttl')} | ||
|
||
# even though the documentation says a type and an answer | ||
# are required, they are not required actually | ||
if rtype: | ||
data['type'] = rtype | ||
if name: | ||
data['host'] = self._relative_name(name) | ||
if content: | ||
data['answer'] = content | ||
|
||
url = '/domains/{}/records/{}'.format(self.domain, record_id) | ||
record_id = self._put(url, data)['id'] | ||
logging.debug('update_record: record %s has been updated', record_id) | ||
|
||
return record_id | ||
|
||
def _delete_record(self, identifier=None, | ||
rtype=None, name=None, content=None): | ||
if not identifier: | ||
if not (rtype and name): | ||
raise ValueError( | ||
'Record identifier or rtype+name must be specified' | ||
) | ||
records = self._list_records(rtype, name, content) | ||
if not records: | ||
LOGGER.warning('delete_record: there is no record to delete') | ||
return None | ||
record_ids = tuple(record['id'] for record in records) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A comprehension list would be a better fit here, since it will be iterated after ( |
||
else: | ||
record_ids = (identifier,) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here: |
||
|
||
for record_id in record_ids: | ||
url = '/domains/{}/records/{}'.format(self.domain, record_id) | ||
self._delete(url) | ||
LOGGER.debug( | ||
'delete_record: record %s has been deleted', record_id | ||
) | ||
|
||
return record_ids if len(record_ids) > 1 else record_ids[0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Our specification about delete is to return |
||
|
||
def _get_raw_record(self, record_id): | ||
url = '/domains/{}/records/{}'.format(self.domain, record_id) | ||
return self._get(url) | ||
|
||
def _request(self, action='GET', url='/', data=None, query_params=None): | ||
response = self.session.request(method=action, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it strictly required to use an HTTP session with pre-authentication? I would prefer if possible to use plain request each time because I saw from my experience that HTTP sessions are raising their own problems. However if it speeds up the requests, I am ok with it if it works for Name.com. |
||
url=self.api_endpoint + url, | ||
json=data, | ||
params=query_params) | ||
response.raise_for_status() | ||
return response.json() | ||
|
||
|
||
Provider = NamecomProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
"""Integration tests for Name.com""" | ||
import json | ||
from unittest import TestCase | ||
|
||
import pytest | ||
from mock import ANY, Mock, patch | ||
from requests import HTTPError | ||
|
||
from lexicon.config import DictConfigSource | ||
from lexicon.providers.namecom import provider_parser | ||
from lexicon.tests.providers.integration_tests import ( | ||
IntegrationTests, _vcr_integration_test | ||
) | ||
|
||
|
||
# Hook into testing framework by inheriting unittest.TestCase and reuse | ||
# the tests which *each and every* implementation of the interface must | ||
# pass, by inheritance from integration_tests.IntegrationTests | ||
class NamecomProviderTests(TestCase, IntegrationTests): | ||
"""TestCase for Name.com""" | ||
|
||
# I don't think we really need some docstrings here. | ||
# pylint: disable=missing-function-docstring | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, most of the time I even put this pylint disable directive at the top of the test module ;) |
||
|
||
provider_name = 'namecom' | ||
domain = 'mim.pw' | ||
|
||
def _filter_headers(self): | ||
return ['Authorization', 'Cookie'] | ||
|
||
def _filter_response(self, response): | ||
headers = response['headers'] | ||
headers.pop('Set-Cookie', None) | ||
headers.pop('content-length', None) | ||
|
||
if response['status']['code'] == 200: | ||
try: | ||
data = json.loads(response['body']['string'].decode()) | ||
except ValueError: | ||
pass | ||
else: | ||
if 'records' in data: | ||
min_id = 10 ** 8 | ||
data['records'] = [ | ||
record for record in data['records'] | ||
if record['id'] > min_id | ||
] | ||
response['body']['string'] = json.dumps(data).encode() | ||
|
||
return response | ||
|
||
########################### | ||
# Provider.authenticate() # | ||
########################### | ||
@_vcr_integration_test | ||
def test_provider_authenticate(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you rename this test to not shadow the existing one in |
||
provider = self._construct_authenticated_provider() | ||
assert provider.session.auth | ||
|
||
############################ | ||
# Provider.create_record() # | ||
############################ | ||
@_vcr_integration_test | ||
def test_provider_when_calling_create_record_for_MX_with_priority(self): # pylint: disable=invalid-name | ||
priority = 42 | ||
config = self._test_config() | ||
config.add_config_source(DictConfigSource({'priority': priority}), 0) | ||
provider = self.provider_module.Provider(config) | ||
provider.authenticate() | ||
|
||
record_id = provider.create_record('MX', 'mx.test1', self.domain) | ||
assert provider._get_raw_record(record_id)['priority'] == priority # pylint: disable=protected-access | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_create_record_for_MX_with_no_priority(self): # pylint: disable=invalid-name | ||
provider = self._construct_authenticated_provider() | ||
record_id = provider.create_record('MX', 'mx.test2', self.domain) | ||
assert 'priority' not in provider._get_raw_record(record_id) # pylint: disable=protected-access | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_create_record_should_fail_on_http_error(self): | ||
provider = self._construct_authenticated_provider() | ||
error = HTTPError(response=Mock()) | ||
with patch.object(provider, '_request', side_effect=error): | ||
with pytest.raises(HTTPError): | ||
provider.create_record('TXT', 'httperror', 'HTTPError') | ||
|
||
############################ | ||
# Provider.update_record() # | ||
############################ | ||
@_vcr_integration_test | ||
def test_provider_when_calling_update_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long | ||
provider = self._construct_authenticated_provider() | ||
with pytest.raises(ValueError): | ||
provider.update_record(None) | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_update_record_should_fail_if_no_record_to_update(self): | ||
provider = self._construct_authenticated_provider() | ||
with pytest.raises(Exception): | ||
provider.update_record(None, 'TXT', 'missingrecord') | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_update_record_should_fail_if_multiple_records_to_update(self): | ||
provider = self._construct_authenticated_provider() | ||
provider.create_record('TXT', 'multiple.test', 'foo') | ||
provider.create_record('TXT', 'multiple.test', 'bar') | ||
with pytest.raises(Exception): | ||
provider.update_record(None, 'TXT', 'multiple.test', 'updated') | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_update_record_filter_by_content_should_pass(self): | ||
provider = self._construct_authenticated_provider() | ||
provider.create_record('TXT', 'multiple.test', 'foo') | ||
provider.create_record('TXT', 'multiple.test', 'bar') | ||
assert provider.update_record(None, 'TXT', 'multiple.test', 'foo') | ||
|
||
@_vcr_integration_test | ||
def test_provider_when_calling_update_record_by_identifier_with_no_other_args_should_pass(self): | ||
provider = self._construct_authenticated_provider() | ||
record_id = provider.create_record('TXT', 'update.test', 'foo') | ||
assert provider.update_record(record_id) | ||
|
||
############################ | ||
# Provider.delete_record() # | ||
############################ | ||
@_vcr_integration_test | ||
def test_provider_when_calling_delete_record_with_no_identifier_or_rtype_and_name_should_fail(self): # pylint: disable=line-too-long | ||
provider = self._construct_authenticated_provider() | ||
with pytest.raises(ValueError): | ||
provider.delete_record() | ||
|
||
@_vcr_integration_test | ||
@patch('lexicon.providers.namecom.LOGGER.warning') | ||
def test_provider_when_calling_delete_record_should_pass_if_no_record_to_delete(self, warning): | ||
provider = self._construct_authenticated_provider() | ||
provider.delete_record(None, 'TXT', 'missingrecord') | ||
warning.assert_called_once() | ||
assert 'no record' in warning.call_args.args[0] | ||
|
||
|
||
def test_subparser_configuration(): | ||
"""Tests the provider_parser method.""" | ||
|
||
subparser = Mock() | ||
provider_parser(subparser) | ||
subparser.add_argument.assert_any_call('--auth-username', help=ANY) | ||
subparser.add_argument.assert_any_call('--auth-token', help=ANY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a specific reason to use comprehension tuples above which then need to cast back tuples into list here ? From what I see, you could use comprehension lists directly (
[record for record in records if ...]
) above, and avoid this cast.