Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DCAS Farm Registry ingestor #331

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v1
with:
python-version: 3.7
python-version: 3.12
architecture: x64
- name: Checkout PyTorch
uses: actions/checkout@master
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ docs/mkdocs.yml

# locust auth files
locust_auth.json

.vscode-extensions/
3 changes: 2 additions & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ with import <nixpkgs> { };

let
# For packages pinned to a specific version
pinnedHash = "933d7dc155096e7575d207be6fb7792bc9f34f6d";
pinnedHash = "24.05";
pinnedPkgs = import (fetchTarball "https://github.com/NixOS/nixpkgs/archive/${pinnedHash}.tar.gz") { };
pythonPackages = pinnedPkgs.python3Packages;
in pkgs.mkShell rec {
Expand Down Expand Up @@ -41,6 +41,7 @@ in pkgs.mkShell rec {
pinnedPkgs.zlib
pinnedPkgs.gnused
pinnedPkgs.rpl
pinnedPkgs.vscode
];

# Run this command, only after creating the virtual environment
Expand Down
179 changes: 179 additions & 0 deletions django_project/gap/ingestor/farm_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Ingestor for DCAS Farmer Registry data
"""

import csv
import os
import shutil
import uuid
import tempfile
import zipfile
from datetime import datetime, timezone
from django.contrib.gis.geos import Point
import logging
from gap.ingestor.base import BaseIngestor
from gap.ingestor.exceptions import (
FileNotFoundException, FileIsNotCorrectException,
)
from gap.models import (
Farm, Crop, FarmRegistry, FarmRegistryGroup,
IngestorSession, CropStageType
)
from django.db import transaction
from django.db.models import Q

logger = logging.getLogger(__name__)


class Keys:
"""Keys for the data."""

CROP = 'crop'
PARAMETER = 'parameter'
GROWTH_STAGE = 'growth_stage'
MIN_RANGE = 'min_range'
MAX_RANGE = 'max_range'
CODE = 'code'

@staticmethod
def check_columns(df) -> bool:
"""Check if all columns exist in dataframe.

:param df: dataframe from csv
:type df: pd.DataFrame
:raises FileIsNotCorrectException: When column is missing
"""
keys = [
Keys.CROP, Keys.PARAMETER, Keys.GROWTH_STAGE,
Keys.MIN_RANGE, Keys.MAX_RANGE, Keys.CODE
]

missing = []
for key in keys:
if key not in df.columns:
missing.append(key)

if missing:
raise FileIsNotCorrectException(
f'Column(s) missing: {",".join(missing)}'
)


class DCASFarmRegistryIngestor(BaseIngestor):
"""Ingestor for DCAS Farmer Registry data."""

def __init__(self, session: IngestorSession, working_dir='/tmp'):
"""Initialize the ingestor with session and working directory.

:param session: Ingestor session object
:type session: IngestorSession
:param working_dir: Directory to extract ZIP files temporarily
:type working_dir: str, optional
"""
super().__init__(session, working_dir)

# Initialize the FarmRegistryGroup model
self.group_model = FarmRegistryGroup

# Placeholder for the group created during this session
self.group = None

def _extract_zip_file(self):
"""Extract the ZIP file to a temporary directory."""
dir_path = os.path.join(self.working_dir, str(uuid.uuid4()))
os.makedirs(dir_path, exist_ok=True)

with self.session.file.open('rb') as zip_file:
with tempfile.NamedTemporaryFile(
delete=False, dir=self.working_dir) as tmp_file:

tmp_file.write(zip_file.read())
tmp_file_path = tmp_file.name

with zipfile.ZipFile(tmp_file_path, 'r') as zip_ref:
zip_ref.extractall(dir_path)

os.remove(tmp_file_path)
return dir_path

def _create_registry_group(self):
"""Create a new FarmRegistryGroup."""
self.group = self.group_model.objects.create(
date_time=datetime.now(timezone.utc),
is_latest=True
)

def _process_row(self, row):
"""Process a single row from the input file."""
try:
# Parse latitude and longitude to create a geometry point
latitude = float(row['FinalLatitude'])
longitude = float(row['FinalLongitude'])
point = Point(x=longitude, y=latitude, srid=4326)

# Get or create the Farm instance
farm, _ = Farm.objects.get_or_create(
unique_id=row['FarmerId'],
defaults={
'geometry': point
}
)

# get crop and stage type
crop_with_stage = row[Keys.CROP].lower().split('_')
crop, _ = Crop.objects.get_or_create(
name__iexact=crop_with_stage[0],
defaults={
'name': crop_with_stage[0].title()
}
)
stage_type = CropStageType.objects.get(
Q(name__iexact=crop_with_stage[1]) |
Q(alias__iexact=crop_with_stage[1])
)

# Parse the planting date
planting_date = datetime.strptime(
row['PlantingDate'], '%m/%d/%Y').date()

# Create the FarmRegistry entry
FarmRegistry.objects.update_or_create(
group=self.group,
farm=farm,
crop=crop,
crop_stage_type=stage_type,
planting_date=planting_date,
)

except Exception as e:
logger.error(f"Error processing row: {row} - {e}")

def _run(self, dir_path):
"""Run the ingestion logic."""
self._create_registry_group()
logger.info(f"Created new registry group: {self.group.id}")

for file_name in os.listdir(dir_path):
if file_name.endswith('.csv'):
file_path = os.path.join(dir_path, file_name)
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
with transaction.atomic():
for row in reader:
self._process_row(row)
break
else:
raise FileNotFoundError("No CSV file found in the extracted ZIP.")

def run(self):
"""Run the ingestion process."""
if not self.session.file:
raise FileNotFoundException("No file found for ingestion.")
dir_path = self._extract_zip_file()
try:
self._run(dir_path)
finally:
shutil.rmtree(dir_path)
5 changes: 5 additions & 0 deletions django_project/gap/models/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class IngestorType:
CABI_PRISE_EXCEL = 'Cabi Prise Excel'
CBAM_BIAS_ADJUST = 'CBAM Bias Adjusted'
DCAS_RULE = 'DCAS Rules'
FARM_REGISTRY = 'Farm Registry'


class IngestorSessionStatus:
Expand Down Expand Up @@ -82,6 +83,7 @@ class Meta: # noqa: D106
),
(IngestorType.CBAM_BIAS_ADJUST, IngestorType.CBAM_BIAS_ADJUST),
(IngestorType.DCAS_RULE, IngestorType.DCAS_RULE),
(IngestorType.FARM_REGISTRY, IngestorType.FARM_REGISTRY),
),
max_length=512
)
Expand Down Expand Up @@ -208,6 +210,7 @@ def _run(self, working_dir):
from gap.ingestor.cabi_prise import CabiPriseIngestor
from gap.ingestor.cbam_bias_adjust import CBAMBiasAdjustIngestor
from gap.ingestor.dcas_rule import DcasRuleIngestor
from gap.ingestor.farm_registry import DCASFarmRegistryIngestor

ingestor = None
if self.ingestor_type == IngestorType.TAHMO:
Expand All @@ -234,6 +237,8 @@ def _run(self, working_dir):
ingestor = CBAMBiasAdjustIngestor
elif self.ingestor_type == IngestorType.DCAS_RULE:
ingestor = DcasRuleIngestor
elif self.ingestor_type == IngestorType.FARM_REGISTRY:
ingestor = DCASFarmRegistryIngestor

if ingestor:
ingestor(self, working_dir).run()
Expand Down
Binary file not shown.
Binary file not shown.
68 changes: 68 additions & 0 deletions django_project/gap/tests/ingestor/test_farm_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# coding: utf-8
"""
Tomorrow Now GAP.

.. note:: Unit tests for DCASFarmRegistryIngestor.
"""

import os
import logging
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import TestCase
from gap.models import (
Farm, Crop, FarmRegistry, FarmRegistryGroup,
IngestorSession, IngestorSessionStatus
)
from gap.ingestor.farm_registry import DCASFarmRegistryIngestor


logger = logging.getLogger(__name__)


class DCASFarmRegistryIngestorTest(TestCase):
"""Unit tests for DCASFarmRegistryIngestor."""

def setUp(self):
"""Set up test case."""
self.test_zip_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'data', # Test data directory
'farm_registry',
'test_farm_registry.zip' # Pre-existing ZIP file
)

def test_successful_ingestion(self):
"""Test successful ingestion of farmer registry data."""
with open(self.test_zip_path, 'rb') as _file:
test_file = SimpleUploadedFile(_file.name, _file.read())

session = IngestorSession.objects.create(
file=test_file,
ingestor_type='Farm Registry',
trigger_task=False
)

ingestor = DCASFarmRegistryIngestor(session)
ingestor.run()

# Verify session status
session.refresh_from_db()
self.assertEqual(session.status, IngestorSessionStatus.SUCCESS)

# Verify FarmRegistryGroup was created
self.assertEqual(FarmRegistryGroup.objects.count(), 1)
group = FarmRegistryGroup.objects.first()
self.assertTrue(group.is_latest)

# Verify Farm and FarmRegistry were created
self.assertEqual(Farm.objects.count(), 2)
self.assertEqual(FarmRegistry.objects.count(), 2)

# Verify specific farm details
farm = Farm.objects.get(unique_id='F001')
self.assertEqual(farm.geometry.x, 36.8219)
self.assertEqual(farm.geometry.y, -1.2921)

# Verify Crop details
crop = Crop.objects.get(name='Maize')
self.assertIsNotNone(crop)
Loading