Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add functionality to calculate intra-cluster distances and compare them between original and fine-tuned models #49

Merged
merged 18 commits into from
Aug 2, 2024

Conversation

Devasy23
Copy link
Owner

This pull request adds functionality to calculate intra-cluster distances and compare them between the original and fine-tuned models. It includes code changes to
load the fine-tuned model, extract faces, calculate embeddings, and calculate intra-cluster distances. The results are outputted, including the shift in intra-cluster
distances, mean distance change, positive and negative impact, and average effects. This functionality will provide valuable insights into the performance of the
fine-tuned model compared to the original model.

  • Add vector_search function for pipeline aggregation
  • Added Vector Search
  • chore: Add error handling for no match found in face recognition
  • Refactor face recognition code to use the Facenet512 model for better accuracy
  • Code added for recognize_face API
  • temp commit to resolve merge
  • Updated gitignore

Co-authored-by: Devansh Shah [email protected] @devansh-shah-11

@Devasy23 Devasy23 self-assigned this Jul 29, 2024
Copy link

@senior-dev-bot senior-dev-bot bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feedback from Senior Dev Bot

Comment on lines +1 to +78
import os
import numpy as np
from keras.models import load_model
from keras.preprocessing import image
from sklearn.metrics.pairwise import euclidean_distances

# Function to load and preprocess images
def load_and_preprocess_image(img_path, target_size=(160, 160)):
img = image.load_img(img_path, target_size=target_size)
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0)
img_array /= 255.0
return img_array

# Function to generate embeddings
def generate_embeddings(model, dataset_path):
embeddings = []
labels = []

for class_name in os.listdir(dataset_path):
class_path = os.path.join(dataset_path, class_name)
if not os.path.isdir(class_path):
continue

for img_name in os.listdir(class_path):
img_path = os.path.join(class_path, img_name)
img_array = load_and_preprocess_image(img_path)
embedding = model.predict(img_array)
embeddings.append(embedding[0])
labels.append(class_name)

embeddings = np.array(embeddings)
labels = np.array(labels)
return embeddings, labels

# Function to calculate intra-cluster distances
def calculate_intra_cluster_distances(embeddings, labels):
unique_labels = np.unique(labels)
distances = []

for label in unique_labels:
cluster_embeddings = embeddings[labels == label]
avg_embedding = np.mean(cluster_embeddings, axis=0)
max_distance = np.max(euclidean_distances(cluster_embeddings, [avg_embedding]))
distances.append(max_distance)

return np.array(distances)

# Load the pre-trained FaceNet model (replace 'facenet_model.h5' with your model file)
model_path = 'facenet_model.h5'
model = load_model(model_path)

# Path to the dataset
dataset_path = 'path_to_your_dataset'

# Generate embeddings for the original model
embeddings_original, labels = generate_embeddings(model, dataset_path)

# Load the fine-tuned model (replace 'facenet_model_finetuned.h5' with your fine-tuned model file)
finetuned_model_path = 'facenet_model_finetuned.h5'
finetuned_model = load_model(finetuned_model_path)

# Generate embeddings for the fine-tuned model
embeddings_finetuned, _ = generate_embeddings(finetuned_model, dataset_path)

# Calculate intra-cluster distances for both models
intra_distances_original = calculate_intra_cluster_distances(embeddings_original, labels)
intra_distances_finetuned = calculate_intra_cluster_distances(embeddings_finetuned, labels)

# Compare intra-cluster distances
intra_distance_change = intra_distances_finetuned - intra_distances_original

# Output the results
print(f"Intra-Cluster Distance Change: {intra_distance_change}")
print(f"Mean Distance Change: {np.mean(intra_distance_change)}")
print(f"Positive Impact: {np.sum(intra_distance_change < 0)}")
print(f"Negative Impact: {np.sum(intra_distance_change > 0)}")
print(f"Average Impact: {np.sum(intra_distance_change == 0)}")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider modularizing the code further by creating separate utility functions and improving variable naming for better readability.

# Update imports
from os import listdir, path
import numpy as np
from keras.models import load_model
from keras.preprocessing import image
from sklearn.metrics.pairwise import euclidean_distances

# Function to calculate intra-cluster distances
def calculate_intra_cluster_distances(embeddings, labels):
    unique_labels = np.unique(labels)
    distances = []

    for label in unique_labels:
        cluster_embeddings = embeddings[labels == label]
        avg_embedding = np.mean(cluster_embeddings, axis=0)
        max_distance = np.max(euclidean_distances(cluster_embeddings, [avg_embedding]))
        distances.append(max_distance)

    return np.array(distances)

This refactoring separates concerns and improves code maintainability.

Comment on lines 8 to 14
from datetime import datetime
from io import BytesIO
from typing import List

from tensorflow.keras.models import load_model
from bson import ObjectId
from deepface import DeepFace
from dotenv import load_dotenv

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Import statements should be grouped in the following order:

  1. Standard library imports
  2. Related third party imports
  3. Local application/library specific imports
from datetime import datetime
from io import BytesIO
from typing import List
from bson import ObjectId

from dotenv import load_dotenv
from tensorflow.keras.models import load_model
from deepface import DeepFace

Comment on lines 143 to +146
FaceRec/static/Images/uploads/*
Images/dbImages/*
Images/Faces/*
Images/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

The changes seem to involve moving directories and adding an entire directory. It would be beneficial to provide more context and explanation behind these changes to ensure they are necessary. Consider breaking up these changes into smaller, more meaningful commits for better clarity and version control.

Consider providing more descriptive commit messages for clarity

API/route.py Outdated
Comment on lines 55 to 141
Department: str
Images: list[str]

def load_and_preprocess_image(img_path, target_size=(160, 160)):

img = image.load_img(img_path, target_size=target_size)
img_array = image.img_to_array(img)
img_array = np.expand_dims(img_array, axis=0)
img_array /= 255.0
return img_array

def calculate_embeddings(image_filename):

"""
Calculate embeddings for the provided image.

Args:
image_filename (str): The path to the image file.

Returns:
list: A list of embeddings for the image.
"""

face_image_data = DeepFace.extract_faces(
image_filename, detector_backend='mtcnn', enforce_detection=False,
)
new_image_path = f'Images/Faces/tmp.jpg'

if face_image_data[0]['face'] is not None:
plt.imsave(new_image_path, face_image_data[0]['face'])

img_array = load_and_preprocess_image(new_image_path)
model=load_model('Model/embedding_trial3.h5')
embedding = model.predict(img_array)[0]
embedding_list = embedding.tolist()
logging.info(f'Embedding created')

return embedding_list

@router.post('/recalculate_embeddings')
async def recalculate_embeddings():
"""
Recalculate embeddings for all the images in the database.

Returns:
dict: A dictionary with a success message.

Raises:
None
"""
logging.info('Recalculating embeddings')
employees_mongo = client2.find(collection2)
for employee in employees_mongo:
print(employee, type(employee))
embeddings = []

# In the initial version, the images were stored in the 'Image' field
if 'Images' in employee:
images = employee['Images']
else:
images = employee['Image']

for encoded_image in images:
encoded_image = encoded_image.replace('data:image/png;base64,', '')
encoded_image = encoded_image.strip()
encoded_image += '=' * (-len(encoded_image) % 4)
img_recovered = base64.b64decode(encoded_image)
pil_image = Image.open(BytesIO(img_recovered))
image_filename = f'{employee["Name"]}.png'
pil_image.save(image_filename)
logging.debug(f'Image saved {employee["Name"]}')
embeddings.append(calculate_embeddings(image_filename))
# os.remove(image_filename)

logging.debug(f'About to update Embeddings: {embeddings}')
# Store the data in the database
client2.update_one(
collection2,
{'EmployeeCode': employee['EmployeeCode']},
{'$set': {'embeddings': embeddings, 'Images': images}},
)

return {'message': 'Embeddings Recalculated successfully'}


# To create new entries of employee
@router.post('/create_new_faceEntry')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

The code changes look good. One suggestion is to handle potential errors during image processing (e.g., invalid image formats). Consider adding input validation and error handling to improve the robustness of the code.

try:
    pil_image = Image.open(BytesIO(img_recovered))
except IOError as e:
    logging.error(f'Error in opening image: {str(e)}')
    continue  # Skip to the next image

Comment on lines 207 to 213
list[Employee]: A list of Employee objects containing employee information.
"""
logging.info('Displaying all employees')
employees_mongo = client.find(collection)
employees_mongo = client2.find(collection2)
logging.info(f'Employees found {employees_mongo}')
employees = [
Employee(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider adding more descriptive variable names for clarity. Utilize type hints for better code readability.

    employees_mongo = client.find(collection)  # consider renaming to employees_mongo = client.find_employee_data(collection)

Comment on lines 363 to 369
"""
logging.info('Deleting Employee')
logging.debug(f'Deleting for EmployeeCode: {EmployeeCode}')
client.find_one_and_delete(collection, {'EmployeeCode': EmployeeCode})
client2.find_one_and_delete(collection2, {'EmployeeCode': EmployeeCode})

return {'Message': 'Successfully Deleted'}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider abstracting the database client logic to improve modularity and maintainability.

def delete_employee(client, collection, EmployeeCode):
    client.find_one_and_delete(collection, {'EmployeeCode': EmployeeCode})

Comment on lines 289 to 296
"""
logging.debug(f'Updating for EmployeeCode: {EmployeeCode}')
try:
user_id = client.find_one(
collection, {'EmployeeCode': EmployeeCode}, projection={'_id': True},
user_id = client2.find_one(
collection2, {'EmployeeCode': EmployeeCode}, projection={'_id': True},
)
print(user_id)
if not user_id:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

It's good practice to use meaningful variable names. Consider renaming client to something like previous_client for clarity. Also, ensure consistency in variable naming (collection vs collection2). Lastly, consider handling potential exceptions when calling client2.find_one.

previous_client = client
new_client = client2
user_id = new_client.find_one(
    new_collection, {'EmployeeCode': EmployeeCode}, projection={'_id': True},
)

Comment on lines 157 to 163
'\r\n',
'',
).replace('\n', '')
EmployeeCode = Employee.EmployeeCode.replace('\r\n', '').replace('\n', '')
EmployeeCode = Employee.EmployeeCode
gender = Employee.gender.replace('\r\n', '').replace('\n', '')
Department = Employee.Department.replace('\r\n', '').replace('\n', '')
encoded_images = Employee.Images

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider simplifying and enhancing code readability by removing unnecessary .replace functions. Also, ensure code consistency by following a consistent naming convention.

EmployeeCode = Employee.EmployeeCode
gender = Employee.gender
Department = Employee.Department
encoded_images = Employee.Images

Comment on lines 308 to 326
image_filename = f'{Employee.Name}.png'
pil_image.save(image_filename)
logging.debug(f'Image saved {Employee.Name}')
face_image_data = DeepFace.extract_faces(
image_filename, detector_backend='mtcnn', enforce_detection=False,
)
embedding = DeepFace.represent(
image_filename, model_name='Facenet', detector_backend='mtcnn',
)
logging.debug(f'Embedding created {Employee.Name}')
embeddings.append(embedding)
os.remove(image_filename)

# embedding = DeepFace.represent(
# image_filename, model_name='Facenet', detector_backend='mtcnn',
# )

embeddings.append(calculate_embeddings(image_filename))
# os.remove(image_filename)

Employee_data['embeddings'] = embeddings

try:
update_result = client.update_one(
collection,
update_result = client2.update_one(
collection2,
{'_id': ObjectId(user_id['_id'])},
update={'$set': Employee_data},
)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider removing commented-out code for clarity. Simplify by extracting the logic for computing embeddings into a separate function for better separation of concerns.

def calculate_embeddings(image_filename):
    embedding = DeepFace.represent(
        image_filename, model_name='Facenet', detector_backend='mtcnn',
    )
    return embedding

API/route.py Outdated
Comment on lines 384 to 421
"""
logging.info('Recognizing Face')
try:
# Code to calculate embeddings via Original Facenet model

img_data = await Face.read()
with open('temp.png', 'wb') as f:
image_filename = 'temp.png'
with open(image_filename, 'wb') as f:
f.write(img_data)

embedding = DeepFace.represent(
img_path='temp.png', model_name='Facenet512', detector_backend='mtcnn',
# embedding = DeepFace.represent(
# img_path='temp.png', model_name='Facenet512', detector_backend='mtcnn',
# )

# Code to calculate embeddings via Finetuned Facenet model
face_image_data = DeepFace.extract_faces(
image_filename, detector_backend='mtcnn', enforce_detection=False,
)
result = client2.vector_search(collection2, embedding[0]['embedding'])
logging.info(f"Result: {result[0]['Name']}, {result[0]['score']}")
os.remove('temp.png')
if result[0]['score'] < 0.5:
return Response(
status_code=404, content=json.dumps({'message': 'No match found'}),
)

if face_image_data and face_image_data[0]['face'] is not None:

plt.imsave(f'Images/Faces/tmp.jpg', face_image_data[0]['face'])
face_image_path = f'Images/Faces/tmp.jpg'
img_array = load_and_preprocess_image(face_image_path)

model = load_model('Model/embedding_trial3.h5')
embedding_list = model.predict(img_array)[0] # Get the first prediction
print(embedding_list, type(embedding_list))
embedding = embedding_list.tolist()
result = client2.vector_search(collection2, embedding)
logging.info(f"Result: {result[0]['Name']}, {result[0]['score']}")
os.remove('temp.png')
if result[0]['score'] < 0.5:
return Response(
status_code=404, content=json.dumps({'message': 'No match found'}),
)
except Exception as e:
logging.error(f'Error: {e}')
os.remove('temp.png')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CODE REVIEW

Consider refactoring to extract common functionality into separate functions for better organization and maintainability.

# Extract common functionality into separate functions
async def calculate_face_embedding(img_data: bytes) -> List[float]:
    image_filename = 'temp.png'
    with open(image_filename, 'wb') as f:
        f.write(img_data)
    
    # Code to calculate embeddings via Original Facenet model
    
    embedding = calculate_original_facenet_embedding(image_filename)
    return embedding

async def recognize_face(img_data: bytes):
    try:
        embedding = await calculate_face_embedding(img_data)
        # Code to calculate embeddings via Finetuned Facenet model
        
        if face_image_data and face_image_data[0]['face'] is not None:
            process_and_search_face(face_image_data)
    except Exception as e:
        handle_recognition_error(e)

logging.info('Recalculating embeddings')
employees_mongo = client2.find(collection2)
for employee in employees_mongo:
print(employee, type(employee))

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (private)
as clear text.
This expression logs
sensitive data (private)
as clear text.
pil_image = Image.open(BytesIO(img_recovered))
image_filename = f'{employee["Name"]}.png'
pil_image.save(image_filename)
logging.debug(f'Image saved {employee["Name"]}')

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (private)
as clear text.
This expression logs
sensitive data (private)
as clear text.
Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
37.8% Coverage on New Code (required ≥ 80%)
E Security Rating on New Code (required ≥ A)

See analysis details on SonarCloud

Catch issues before they fail your Quality Gate with our IDE extension SonarLint

@devansh-shah-11 devansh-shah-11 merged commit 9f97d1a into main Aug 2, 2024
6 of 8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants