Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/nibbles
Browse files Browse the repository at this point in the history
  • Loading branch information
originalsouth committed Nov 21, 2024
2 parents bd59705 + f751a99 commit e9a4576
Show file tree
Hide file tree
Showing 28 changed files with 617 additions and 277 deletions.
3 changes: 0 additions & 3 deletions .env-dist
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ BYTES_DB_URI=postgresql://${BYTES_DB_USER}:${BYTES_DB_PASSWORD}@postgres:5432/${
# --- Octopoes --- #
# See `octopoes/octopoes/config/settings.py`

# Number of Celery workers (for the Octopoes API worker) that need to be started
CELERY_WORKER_CONCURRENCY=${CELERY_WORKER_CONCURRENCY:-4}

# --- Mula --- #
# See `mula/scheduler/config/settings.py`

Expand Down
38 changes: 17 additions & 21 deletions boefjes/boefjes/dependencies/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
from boefjes.storage.interfaces import (
ConfigStorage,
DuplicatePlugin,
IntegrityError,
NotFound,
PluginNotFound,
PluginStorage,
SettingsNotConformingToSchema,
UniqueViolation,
)

logger = structlog.get_logger(__name__)
Expand Down Expand Up @@ -49,9 +49,9 @@ def get_all(self, organisation_id: str) -> list[PluginType]:
return [self._set_plugin_enabled(plugin, organisation_id) for plugin in all_plugins.values()]

def _get_all_without_enabled(self) -> dict[str, PluginType]:
all_plugins = {plugin.id: plugin for plugin in self.local_repo.get_all()}
all_plugins = {plugin.id: plugin for plugin in self.plugin_storage.get_all()}

for plugin in self.plugin_storage.get_all():
for plugin in self.local_repo.get_all(): # Local plugins take precedence
all_plugins[plugin.id] = plugin

return all_plugins
Expand Down Expand Up @@ -94,7 +94,7 @@ def clone_settings_to_organisation(self, from_organisation: str, to_organisation
self.set_enabled_by_id(plugin_id, to_organisation, enabled=True)

def upsert_settings(self, settings: dict, organisation_id: str, plugin_id: str):
self._assert_settings_match_schema(settings, plugin_id)
self._assert_settings_match_schema(settings, plugin_id, organisation_id)
self._put_boefje(plugin_id)

return self.config_storage.upsert(organisation_id, plugin_id, settings=settings)
Expand All @@ -113,29 +113,25 @@ def create_boefje(self, boefje: Boefje) -> None:
try:
with self.plugin_storage as storage:
storage.create_boefje(boefje)
except IntegrityError as error:
raise DuplicatePlugin(self._translate_duplicate_plugin(error.message))
except UniqueViolation as error:
raise DuplicatePlugin(error.field)
except KeyError:
try:
with self.plugin_storage as storage:
storage.create_boefje(boefje)
except IntegrityError as error:
raise DuplicatePlugin(self._translate_duplicate_plugin(error.message))

def _translate_duplicate_plugin(self, error_message):
translations = {"boefje_plugin_id": "id", "boefje_name": "name"}
return next((value for key, value in translations.items() if key in error_message), None)
except UniqueViolation as error:
raise DuplicatePlugin(error.field)

def create_normalizer(self, normalizer: Normalizer) -> None:
try:
self.local_repo.by_id(normalizer.id)
raise DuplicatePlugin("id")
raise DuplicatePlugin(field="id")
except KeyError:
try:
plugin = self.local_repo.by_name(normalizer.name)

if plugin.types == "normalizer":
raise DuplicatePlugin("name")
raise DuplicatePlugin(field="name")
else:
self.plugin_storage.create_normalizer(normalizer)
except KeyError:
Expand Down Expand Up @@ -177,12 +173,12 @@ def delete_settings(self, organisation_id: str, plugin_id: str):
# We don't check the schema anymore because we can provide entries through the global environment as well

def schema(self, plugin_id: str) -> dict | None:
try:
boefje = self.plugin_storage.boefje_by_id(plugin_id)
plugin = self._get_all_without_enabled().get(plugin_id)

return boefje.boefje_schema
except PluginNotFound:
return self.local_repo.schema(plugin_id)
if plugin is None or not isinstance(plugin, Boefje):
return None

return plugin.boefje_schema

def cover(self, plugin_id: str) -> Path:
try:
Expand Down Expand Up @@ -212,8 +208,8 @@ def set_enabled_by_id(self, plugin_id: str, organisation_id: str, enabled: bool)

self.config_storage.upsert(organisation_id, plugin_id, enabled=enabled)

def _assert_settings_match_schema(self, all_settings: dict, plugin_id: str):
schema = self.schema(plugin_id)
def _assert_settings_match_schema(self, all_settings: dict, plugin_id: str, organisation_id: str):
schema = self.by_plugin_id(plugin_id, organisation_id).boefje_schema

if schema: # No schema means that there is nothing to assert
try:
Expand Down
4 changes: 2 additions & 2 deletions boefjes/boefjes/plugins/kat_fierce/boefje.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"id": "fierce",
"name": "Fierce",
"description": "Perform DNS reconnaissance using Fierce. Helps to locate non-contiguous IP space and hostnames against specified hostnames. No exploitation is performed.",
"description": "Perform DNS reconnaissance using Fierce. Helps to locate non-contiguous IP space and hostnames against specified hostnames. No exploitation is performed. Beware if your DNS is managed by an external party. This boefjes performs a brute force attack against the name server.",
"consumes": [
"Hostname"
],
"scan_level": 1
"scan_level": 3
}
4 changes: 2 additions & 2 deletions boefjes/boefjes/plugins/pdio_subfinder/boefje.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"id": "pdio-subfinder",
"name": "Subfinder",
"description": "A subdomain discovery tool. (projectdiscovery.io)",
"description": "A subdomain discovery tool. (projectdiscovery.io). Returns valid subdomains for websites using passive online sources. Beware that many of the online sources require their own API key to get more accurate data.",
"consumes": [
"Hostname"
],
"environment_keys": [
"SUBFINDER_RATE_LIMIT",
"SUBFINDER_VERSION"
],
"scan_level": 2
"scan_level": 1
}
4 changes: 2 additions & 2 deletions boefjes/boefjes/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sqlalchemy.orm import Session
from typing_extensions import Self

from boefjes.storage.interfaces import IntegrityError, StorageError
from boefjes.storage.interfaces import IntegrityError, StorageError, UniqueViolation

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -40,7 +40,7 @@ def __exit__(self, exc_type: type[Exception], exc_value: str, exc_traceback: str
self.session.commit()
except exc.IntegrityError as e:
if isinstance(e.orig, errors.UniqueViolation):
raise IntegrityError(str(e.orig))
raise UniqueViolation(str(e.orig))
raise IntegrityError("An integrity error occurred") from e
except exc.DatabaseError as e:
raise StorageError("A storage error occurred") from e
Expand Down
19 changes: 17 additions & 2 deletions boefjes/boefjes/storage/interfaces.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from abc import ABC

from boefjes.models import Boefje, Normalizer, Organisation, PluginType
Expand All @@ -17,6 +18,20 @@ def __init__(self, message: str):
self.message = message


class UniqueViolation(IntegrityError):
def __init__(self, message: str):
self.field = self._get_field_name(message)
self.message = message

def _get_field_name(self, message: str) -> str | None:
matches = re.findall(r"Key \((.*)\)=", message)

if matches:
return matches[0]

return None


class SettingsNotConformingToSchema(StorageError):
def __init__(self, plugin_id: str, validation_error: str):
super().__init__(f"Settings for plugin {plugin_id} are not conform the plugin schema: {validation_error}")
Expand Down Expand Up @@ -56,8 +71,8 @@ def __init__(self, plugin_id: str):


class DuplicatePlugin(NotAllowed):
def __init__(self, key: str):
super().__init__(f"Duplicate plugin {key}")
def __init__(self, field: str | None):
super().__init__(f"Duplicate plugin: a plugin with this {field} already exists")


class OrganisationStorage(ABC):
Expand Down
18 changes: 14 additions & 4 deletions boefjes/tests/integration/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ def test_cannot_add_plugin_reserved_id(test_client, organisation):
boefje = Boefje(id="dns-records", name="My test boefje", static=False)
response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json())
assert response.status_code == 400
assert response.json() == {"detail": "Duplicate plugin id"}
assert response.json() == {"detail": "Duplicate plugin: a plugin with this id already exists"}

normalizer = Normalizer(id="kat_nmap_normalize", name="My test normalizer")
response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=normalizer.model_dump_json())
assert response.status_code == 400
assert response.json() == {"detail": "Duplicate plugin id"}
assert response.json() == {"detail": "Duplicate plugin: a plugin with this id already exists"}


def test_add_boefje(test_client, organisation):
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_cannot_add_static_plugin_with_duplicate_name(test_client, organisation)
boefje = Boefje(id="test_plugin", name="DNS records", static=False)
response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json())
assert response.status_code == 400
assert response.json() == {"detail": "Duplicate plugin name"}
assert response.json() == {"detail": "Duplicate plugin: a plugin with this name already exists"}


def test_cannot_add_plugin_with_duplicate_name(test_client, organisation):
Expand All @@ -91,7 +91,7 @@ def test_cannot_add_plugin_with_duplicate_name(test_client, organisation):
boefje = Boefje(id="test_plugin_2", name="My test boefje", static=False)
response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=boefje.model_dump_json())
assert response.status_code == 400
assert response.json() == {"detail": "Duplicate plugin name"}
assert response.json() == {"detail": "Duplicate plugin: a plugin with this name already exists"}

normalizer = Normalizer(id="test_normalizer", name="My test normalizer", static=False)
response = test_client.post(f"/v1/organisations/{organisation.id}/plugins", content=normalizer.model_dump_json())
Expand Down Expand Up @@ -169,6 +169,16 @@ def test_cannot_create_boefje_with_invalid_schema(test_client, organisation):
assert r.status_code == 422


def test_schema_is_taken_from_disk(test_client, organisation, session):
# creates a database record of dns-records
test_client.patch(f"/v1/organisations/{organisation.id}/plugins/dns-records", json={"enabled": True})
session.execute("UPDATE boefje set schema = null where plugin_id = 'dns-records'")
session.commit()

response = test_client.get(f"/v1/organisations/{organisation.id}/plugins/dns-records").json()
assert response["boefje_schema"] is not None


def test_cannot_set_invalid_cron(test_client, organisation):
boefje = Boefje(id="test_plugin", name="My test boefje", description="123").model_dump(mode="json")
boefje["cron"] = "bad format"
Expand Down
109 changes: 57 additions & 52 deletions mula/scheduler/connectors/services/katalogus.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,90 +44,95 @@ def __init__(self, host: str, source: str, timeout: int, pool_connections: int,

def flush_caches(self) -> None:
self.flush_plugin_cache()
self.flush_normalizer_cache()
self.flush_boefje_cache()
self.flush_boefje_cache(self.plugin_cache)
self.flush_normalizer_cache(self.plugin_cache)

def flush_plugin_cache(self) -> None:
def flush_plugin_cache(self):
self.logger.debug("Flushing the katalogus plugin cache for organisations")

plugin_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

with self.plugin_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.plugin_cache.expiration_enabled = False
self.plugin_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.plugin_cache.setdefault(org.id, {})

plugins = self.get_plugins_by_organisation(org.id)
self.plugin_cache[org.id] = {plugin.id: plugin for plugin in plugins if plugin.enabled}

self.plugin_cache.cache = plugin_cache
self.plugin_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus plugin cache for organisations")

def flush_boefje_cache(self) -> None:
def flush_boefje_cache(self, plugins=None) -> None:
"""boefje.consumes -> plugin type boefje"""
self.logger.debug("Flushing the katalogus boefje type cache for organisations")

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()

orgs = self.get_organisations()
for org in orgs:
self.boefje_cache[org.id] = {}
boefje_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
boefje_cache.setdefault(org.id, {})

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "boefje":
continue
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "boefje":
continue

if plugin.enabled is False:
continue
if plugin.enabled is False:
continue

if not plugin.consumes:
continue
if not plugin.consumes:
continue

# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
self.boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue
# NOTE: backwards compatibility, when it is a boefje the
# consumes field is a string field.
if isinstance(plugin.consumes, str):
boefje_cache[org.id].setdefault(plugin.consumes, []).append(plugin)
continue

for type_ in plugin.consumes:
self.boefje_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
boefje_cache[org.id].setdefault(type_, []).append(plugin)

with self.boefje_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.boefje_cache.expiration_enabled = False
self.boefje_cache.reset()
self.boefje_cache.cache = boefje_cache
self.boefje_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus boefje type cache for organisations")

def flush_normalizer_cache(self) -> None:
def flush_normalizer_cache(self, plugins=None) -> None:
"""normalizer.consumes -> plugin type normalizer"""
self.logger.debug("Flushing the katalogus normalizer type cache for organisations")

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
normalizer_cache: dict = {}
orgs = self.get_organisations()
for org in orgs:
normalizer_cache.setdefault(org.id, {})

orgs = self.get_organisations()
for org in orgs:
self.normalizer_cache[org.id] = {}
org_plugins = plugins[org.id].values() if plugins else self.get_plugins_by_organisation(org.id)
for plugin in org_plugins:
if plugin.type != "normalizer":
continue

for plugin in self.get_plugins_by_organisation(org.id):
if plugin.type != "normalizer":
continue
if plugin.enabled is False:
continue

if plugin.enabled is False:
continue
if not plugin.consumes:
continue

if not plugin.consumes:
continue

for type_ in plugin.consumes:
self.normalizer_cache[org.id].setdefault(type_, []).append(plugin)
for type_ in plugin.consumes:
normalizer_cache[org.id].setdefault(type_, []).append(plugin)

with self.normalizer_cache_lock:
# First, we reset the cache, to make sure we won't get any ExpiredError
self.normalizer_cache.expiration_enabled = False
self.normalizer_cache.reset()
self.normalizer_cache.cache = normalizer_cache
self.normalizer_cache.expiration_enabled = True

self.logger.debug("Flushed the katalogus normalizer type cache for organisations")
Expand Down
Loading

0 comments on commit e9a4576

Please sign in to comment.