diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index d577937e9..f26aec6b7 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -12,7 +12,7 @@ from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ObjectDoesNotExist, ValidationError from django.core.validators import MaxValueValidator -from django.db import models +from django.db import IntegrityError, models from django.utils import timezone from django.utils.translation import gettext_lazy as _ from jsonfield import JSONField @@ -179,10 +179,18 @@ def _get_or_create( metric.extra_tags = cls._sort_dict(metric.extra_tags) metric.save() except cls.DoesNotExist: - metric = cls(**kwargs) - metric.full_clean() - metric.save() - created = True + try: + metric = cls(**kwargs) + metric.full_clean() + metric.save() + created = True + except IntegrityError: + # Potential race conditions may arise when multiple + # celery workers concurrently write data to InfluxDB. + # These simultaneous writes can result in the database + # processing transactions from another "metric.save()" + # call, potentially leading to IntegrityError exceptions. + return cls._get_or_create(**kwargs) return metric, created @classmethod diff --git a/openwisp_monitoring/monitoring/tests/test_models.py b/openwisp_monitoring/monitoring/tests/test_models.py index c63f0dc81..0fa40a19b 100644 --- a/openwisp_monitoring/monitoring/tests/test_models.py +++ b/openwisp_monitoring/monitoring/tests/test_models.py @@ -1,9 +1,11 @@ from datetime import timedelta +from unittest.mock import patch from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.core.exceptions import ValidationError +from django.db import IntegrityError from django.test import TestCase from django.utils import timezone from swapper import load_model @@ -131,6 +133,22 @@ def test_get_or_create_renamed_object(self): self.assertEqual(m2.name, m.name) self.assertFalse(created) + @patch.object( + Metric.objects, + 'get', + side_effect=[ + Metric.DoesNotExist, + Metric(name='lan', configuration='test_metric'), + ], + ) + @patch.object(Metric, 'save', side_effect=IntegrityError) + def test_get_or_create_integrity_error(self, mocked_save, mocked_get): + metric, _ = Metric._get_or_create(name='lan', configuration='test_metric') + mocked_save.assert_called_once() + self.assertEqual(mocked_get.call_count, 2) + self.assertEqual(metric.name, 'lan') + self.assertEqual(metric.configuration, 'test_metric') + def test_metric_write_wrong_related_fields(self): m = self._create_general_metric(name='ping', configuration='ping') extra_values = {'reachable': 0, 'rtt_avg': 0.51, 'rtt_max': 0.6, 'rtt_min': 0.4}