-
Notifications
You must be signed in to change notification settings - Fork 0
/
fitness_evaluator.py
108 lines (93 loc) · 4.42 KB
/
fitness_evaluator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import re
import time
import psutil
class FitnessEvaluator:
@staticmethod
def execute_program(program_code):
try:
local_scope = {}
lines = program_code.split('\n')
import_lines = [line for line in lines if line.startswith('import') or line.startswith('from')]
other_lines = [line for line in lines if not (line.startswith('import') or line.startswith('from'))]
import_code = '\n'.join(import_lines)
other_code = '\n'.join(other_lines)
exec(import_code, globals(), local_scope)
exec(other_code, globals(), local_scope)
return local_scope.get('optimized_bucket_filler')
except Exception as e:
print(f"Error executing program: {e}")
return None
@staticmethod
def measure_time(func, *args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
return end_time - start_time, result
@staticmethod
def measure_memory(func, *args, **kwargs):
process = psutil.Process(os.getpid())
memory_before = process.memory_info().rss
result = func(*args, **kwargs)
memory_after = process.memory_info().rss
return memory_after - memory_before, result
@staticmethod
def scoring_function(buckets, bucket_limit):
total_empty_space = sum(bucket_limit - sum(bucket) for bucket in buckets)
distribution_score = FitnessEvaluator.calculate_distribution_score(buckets)
variance_score = FitnessEvaluator.calculate_variance_score(buckets, bucket_limit)
score = (1000 - total_empty_space) - (10 * len(buckets)) + distribution_score + variance_score
return max(score, 0) # Ensure the score is not negative
@staticmethod
def calculate_distribution_score(buckets):
if not buckets:
return 0
average_fill = sum(sum(bucket) for bucket in buckets) / len(buckets)
return 100 - sum(abs(sum(bucket) - average_fill) for bucket in buckets) / len(buckets)
@staticmethod
def calculate_variance_score(buckets, bucket_limit):
if not buckets:
return 0
total_variance = sum((sum(bucket) - bucket_limit / 2) ** 2 for bucket in buckets) / len(buckets)
return 100 - total_variance / (bucket_limit / 2) ** 2
@staticmethod
def calculate_fitness_score(time_taken, memory_used, custom_score, weights):
normalized_time_score = 100 / (1 + time_taken)
normalized_memory_score = 100 / (1 + memory_used)
normalized_custom_score = (custom_score / 1000) * 100
fitness_score = (weights['time'] * normalized_time_score +
weights['memory'] * normalized_memory_score +
weights['score'] * normalized_custom_score)
return fitness_score
@staticmethod
def evaluate_algorithm(program_code_str, numberList, bucket_limit, weights):
algorithm_func = FitnessEvaluator.execute_program(program_code_str)
if callable(algorithm_func):
numberList_copy = numberList[:]
time_taken, buckets = FitnessEvaluator.measure_time(algorithm_func, numberList_copy, bucket_limit)
memory_used, _ = FitnessEvaluator.measure_memory(algorithm_func, numberList_copy, bucket_limit)
score = FitnessEvaluator.scoring_function(buckets, bucket_limit)
fitness_score = FitnessEvaluator.calculate_fitness_score(time_taken, memory_used, score, weights)
return {
"time_taken": time_taken,
"memory_used": memory_used,
"score": score,
"fitness_score": fitness_score,
"buckets": buckets
}
else:
print("Function 'optimized_bucket_filler' not found in the provided code.")
return None
class GeneticAlgorithmConfig:
def __init__(self, generations=5, population_size=6):
self.generations = generations
self.population_size = population_size
self.previous_results = []
def is_iteration_unique(self, current_results):
for prev_results in self.previous_results:
if (current_results['score'] == prev_results['score'] and
current_results['fitness_score'] == prev_results['fitness_score']):
return False
return True
def add_result(self, result):
self.previous_results.append(result)