forked from koodaamo/tnefparse
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request koodaamo#7 from agaridata/vt_tests
BP-95: Add VT caching
- Loading branch information
Showing
8 changed files
with
194 additions
and
32 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
import time | ||
from collections import OrderedDict | ||
from orator import Model | ||
from typing import Callable | ||
|
||
MAX_CACHE_SIZE = 4096 | ||
MALIGN_TTL = 12 * 60 * 60 | ||
BENIGN_TTL = 30 * 60 | ||
PENDING_TTL = 5 * 60 | ||
|
||
|
||
class ExpiringLRUCache: | ||
def __init__(self, max_len: int) -> None: | ||
self.dict: OrderedDict = OrderedDict() | ||
self.max_len: int = max_len | ||
|
||
def get(self, key: str): | ||
item, age_out_time = self.dict.get(key, (None, 0)) | ||
if age_out_time >= time.time(): | ||
del self.dict[key] | ||
return item | ||
|
||
def set(self, key: str, value: Model, age_out_seconds: int): | ||
if key in self.dict: | ||
del self.dict[key] | ||
|
||
self.dict[key] = (value, time.time() + age_out_seconds) | ||
|
||
if len(self.dict) > self.max_len: | ||
self.dict.popitem(last=False) | ||
|
||
def __repr__(self): | ||
return '%s(%r)' % (self.__class__.__name__, self.dict.items()) | ||
|
||
|
||
class Scanner: | ||
def __init__(self, model: Model, scanner: Callable) -> None: | ||
self.cache = ExpiringLRUCache(MAX_CACHE_SIZE) | ||
self.model = model | ||
self.scanner: Callable = scanner | ||
|
||
def cache_model(self, model): | ||
self.cache.set(model.resource, model, MALIGN_TTL if model.malicious else BENIGN_TTL) | ||
|
||
def scan(self, *args) -> Model: | ||
resource = args[0] | ||
|
||
mem = self.cache.get(resource) | ||
if mem: | ||
return mem | ||
|
||
model = self.model.first_or_new(resource=resource) | ||
if model.exists: | ||
self.cache_model(model) | ||
return model | ||
|
||
scan = self.scanner(*args) | ||
if scan: | ||
model.results = scan | ||
model.save() | ||
self.cache_model(model) | ||
else: | ||
model.pending = True | ||
self.cache.set(resource, model, PENDING_TTL) | ||
return model |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
pytest | ||
moto | ||
requests_mock |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
import unittest | ||
import itertools | ||
from unittest.mock import MagicMock | ||
from orator.orm import Model | ||
|
||
from radon.scanner import Scanner | ||
|
||
class TestScanner(unittest.TestCase): | ||
|
||
def run_scan(self, in_cache: bool, in_db: bool, in_api: bool): | ||
def lookup(r): | ||
if r == 'lookup': | ||
return {'stuff': 'api'} | ||
else: | ||
return None | ||
|
||
model_instance = MagicMock(spec=Model) | ||
model_instance.resource = 'frogs' | ||
model_instance.exists = False | ||
model_instance.pending = None | ||
model_instance.malicious = None | ||
model_instance.results = {'stuff': 'in_db'} | ||
|
||
DummyModel = MagicMock(spec=Model) | ||
DummyModel.first_or_new.return_value = model_instance | ||
|
||
scanner = Scanner(DummyModel, lookup) | ||
|
||
if in_api: | ||
model_instance.resource = 'lookup' | ||
|
||
if in_db: | ||
model_instance.exists = True | ||
|
||
if in_cache: | ||
model_instance.results = {'stuff': 'cached'} | ||
scanner.cache.set(model_instance.resource, model_instance, 10) | ||
|
||
return scanner.scan(model_instance.resource) | ||
|
||
def test_scanner(self): | ||
for args in itertools.product([True, False], repeat=3): | ||
m = self.run_scan(*args) | ||
in_cache, in_db, in_api = args | ||
if in_cache: | ||
assert not m.pending | ||
assert m.results['stuff'] == 'cached' | ||
else: | ||
if in_db: | ||
assert not m.pending | ||
assert m.results['stuff'] == 'in_db' | ||
else: | ||
if in_api: | ||
assert not m.pending | ||
assert m.results['stuff'] == 'api' | ||
else: | ||
assert m.pending |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
import unittest | ||
import requests_mock | ||
from io import StringIO | ||
|
||
from radon.virus_total import vt_url_report, vt_file_report | ||
|
||
class TestVirusTotal(unittest.TestCase): | ||
|
||
def test_uri(self): | ||
with requests_mock.Mocker() as mock: | ||
mock.post('/vtapi/v2/url/report', json={}) | ||
assert vt_url_report('http://agari.com/') is None | ||
mock.post('/vtapi/v2/url/report', json={'scans': [1,2,3], 'positives': 3}) | ||
assert vt_url_report('http://agari.com/')['positives'] == 3 | ||
|
||
def test_file(self): | ||
with requests_mock.Mocker() as mock: | ||
mock.post('/vtapi/v2/file/report', json={}) | ||
mock.post('/vtapi/v2/file/scan', json={}) | ||
fh = StringIO('contents') | ||
assert vt_file_report('abc','name', fh) is None | ||
assert mock.call_count == 2 | ||
mock.post('/vtapi/v2/file/report', json={'scans': [1,2], 'positives': 2}) | ||
assert vt_file_report('abc','name', fh)['positives'] == 2 | ||
assert mock.call_count == 3 |