Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature | Query Estimate #549

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions core/migrations/0041_operation_estimated_run_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.1 on 2021-05-10 20:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0040_merge_20210420_1128'),
]

operations = [
migrations.AddField(
model_name='operation',
name='estimated_run_time',
field=models.IntegerField(default=0, null=True),
),
]
1 change: 1 addition & 0 deletions core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class Operation(BaseEntity):
{"type": "[error, warning, info]", "message": "", ""}
"""
logs = models.JSONField(blank=True, null=True, default=dict)
estimated_run_time = models.IntegerField(null=True, default=0)

def __str__(self):
return self.name
Expand Down
1 change: 1 addition & 0 deletions core/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class Meta:
'aliases',
'alias_creation_status',
'logs',
'estimated_run_time',
)

def create(self, validated_data):
Expand Down
16 changes: 16 additions & 0 deletions core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from core.pypika_fts_utils import TableQueryBuilder
from data.db_manager import count_rows, run_query
from core.models import FrozenData, Operation, SavedQueryData, Source
from core import query

@app.task(bind=False)
def count_operation_rows(id):
Expand Down Expand Up @@ -72,3 +73,18 @@ def create_dataset_archive(id):
return { "status": "failed", "result": json.dumps(create_result) }
except SavedQueryData.DoesNotExist:
return { "status": "errored", "result": id }


@app.task(bind=False)
def estimate_operation_time(id):
try:
operation = Operation.objects.get(id=id)
estimate = query.querytime_estimate(operation=operation)
if estimate[0]['result'] == 'success':
operation.estimated_run_time = int(estimate[0]['message'])
operation.save()
return { "status": "success", "result": estimate[0] }
else:
return { "status": "error" + estimate[0]['message'], "result": estimate[0]['message'] }
except Operation.DoesNotExist:
return { "status": "error", "result": "Operation does not exist" }
1 change: 0 additions & 1 deletion core/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
path('dataset/preview/', core_views.PreviewOperationData.as_view()),
path('dataset/data/<int:pk>/', core_views.ViewData.as_view()),
path('dataset/alias/<int:pk>/', core_views.OperationColumnAlias.as_view()),
path('dataset/estimate/<int:pk>/', core_views.EstimateQueryTime.as_view()),
# For handling saving query sets, and freezing data
path('savedquerysets/', core_views.SavedQueryDataList.as_view()),
path('savedqueryset/<int:pk>/', core_views.SavedQueryDataDetail.as_view()),
Expand Down
25 changes: 2 additions & 23 deletions core/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
ScheduledEventSerializer, SectorSerializer,
SourceSerializer, TagSerializer, ThemeSerializer,
UserSerializer)
from core.tasks import create_dataset_archive, create_table_archive
from core.tasks import create_dataset_archive, create_table_archive, estimate_operation_time
from data.db_manager import run_query, update_table_from_tuple
from data_updates.utils import ScriptExecutor, list_update_scripts

Expand Down Expand Up @@ -328,6 +328,7 @@ def get_queryset(self):

def perform_create(self, serializer):
serializer.save(user=self.request.user)
estimate_operation_time.delay(serializer.instance.id)


class UserOperationList(generics.ListAPIView):
Expand Down Expand Up @@ -1006,25 +1007,3 @@ def get(self, request, pk, format=None):
raise CustomAPIException({'detail': str(e)})


class EstimateQueryTime(APIView):
"""
Estimate the operation query time.
"""
authentication_classes = [TokenAuthentication]
permission_classes = (permissions.IsAuthenticatedOrReadOnly & IsOwnerOrReadOnly,)

def get_queryset(self, pk):
try:
operation = Operation.objects.get(id=pk)
return operation
except Operation.DoesNotExist:
raise Http404

def get(self, request, pk):
try:
operation = self.get_queryset(pk)
estimate = query.querytime_estimate(operation=operation)
return Response(estimate)
except Exception as e:
handle_uncaught_error(e)
raise CustomAPIException({'detail': str(e)})
11 changes: 6 additions & 5 deletions data/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,13 @@ def analyse_query(query, database="datasets"):
analyse_cursor.execute(analyse_query)
raw_results = list(analyse_cursor.fetchall())
time_in_ms = 0.0
ms_in_one_second = 1000000.0
# We get last two i.e planning time and execution time
for raw_result in raw_results[-2]:
time_event, = re.findall('Time: ([\d\.]+) ms', raw_result)
time_in_ms += float(time_event)
# time_in_seconds = float(time_in_ms/ms_in_one_second)
for raw_result in raw_results:
if not isinstance(raw_result[0], str):
continue
time_event = re.findall('Time: ([\d\.]+) ms', raw_result[0])
if len(time_event) == 1:
time_in_ms += float(time_event[0])
results = [
{
"result": "success",
Expand Down
22 changes: 15 additions & 7 deletions data/management/commands/dry_run_current_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,21 @@ def checkAllQueries(self):
queries = Operation.objects.all()
for query in queries:
print(query.id)
sql = QueryBuilder(operation=query).get_sql(limit=2)
results = analyse_query(sql)
if results[0]['result'] == 'success':
continue
else:
self.stdout.write(self.style.ERROR("Failed for Operation {} - {} with error {}".format(query.id, query.name, results[0]['error'])))
input('Press Enter to continue...')
try:
sql = QueryBuilder(operation=query).get_sql(limit=2)
self.stdout.write(self.style.SUCCESS("{}".format(sql)))
results = analyse_query(sql)
if results[0]['result'] == 'success':
continue
else:
self.stdout.write(self.style.ERROR("Failed for Operation {} - {} with error {}".format(query.id, query.name, results[0]['message'])))
input('Press Enter to continue...')
except AttributeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with attribute error {}'.format(query.id, query.name, error)))
except TypeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with type error {}'.format(query.id, query.name, error)))
except Exception as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with error {}'.format(query.id, query.name, error)))

def checkQueriesBySource(self, source, old_cols):
queries = Operation.objects.all()
Expand Down
54 changes: 54 additions & 0 deletions data/management/commands/update_query_estimate_times.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from django.conf import settings
from django.core.management.base import BaseCommand

from core.models import Operation
from core.pypika_utils import QueryBuilder
from core import query

class Command(BaseCommand):

def add_arguments(self, parser):
# parser.add_argument('active_mirror_name', type=str, help='Table name where the columns belong')
parser.add_argument('-a', '--all', action='store_true', help='Default - Used if you want to update all current queries')

def handle(self, *args, **kwargs):
all = kwargs['all']
self.updateAllQueries()

def updateAllQueries(self):
operations = Operation.objects.all()
for operation in operations:
try:
estimate = query.querytime_estimate(operation=operation)
if estimate[0]['result'] == 'success':
operation.estimated_run_time = int(estimate[0]['message'])
operation.save()
self.stdout.write(self.style.SUCCESS("Done for {} - {}".format(operation.id, operation.name)))
else:
self.stdout.write(self.style.ERROR("Failed for Operation {} - {} with error {}".format(operation.id, operation.name, results[0]['message'])))
# input('Press Enter to continue...')
except AttributeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with attribute error {}'.format(operation.id, operation.name, error)))
except TypeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with type error {}'.format(operation.id, operation.name, error)))
except Exception as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with error {}'.format(operation.id, operation.name, error)))

def updateById(self, id):
operations = Operation.objects.filter(id__in=[id])
for operation in operations:
try:
estimate = query.querytime_estimate(operation=operation)
if estimate[0]['result'] == 'success':
operation.estimated_run_time = int(estimate[0]['message'])
operation.save()
self.stdout.write(self.style.SUCCESS("Done for {} - {}".format(operation.id, operation.name)))
else:
self.stdout.write(self.style.ERROR("Failed for Operation {} - {} with error {}".format(operation.id, operation.name, results[0]['message'])))
# input('Press Enter to continue...')
except AttributeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with attribute error {}'.format(operation.id, operation.name, error)))
except TypeError as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with type error {}'.format(operation.id, operation.name, error)))
except Exception as error:
self.stdout.write(self.style.NOTICE('Failed for Operation {} - {} with error {}'.format(operation.id, operation.name, error)))
5 changes: 5 additions & 0 deletions data_updates/update_query_estimates.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

cd /src

python3 manage.py update_query_estimate_times -a
56 changes: 56 additions & 0 deletions frontend/cypress/fixtures/dataset4.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"id": 4,
"name": "Test Column Alias Logs",
"description": "Test Column Alias Logs",
"operation_query": "('SELECT COUNT(\"sq0\".*) FROM (SELECT \"country_name\",\"country_code\",\"iso3\" FROM \"repo\".\"crs_current_isos\") \"sq0\"', 'SELECT \"country_name\",\"country_code\",\"iso3\" FROM \"repo\".\"crs_current_isos\"')",
"row_count": 15,
"theme": 1,
"sample_output_path": null,
"tags": [],
"operation_steps": [
{
"id": 1,
"step_id": 1,
"name": "t",
"description": "t",
"query_func": "select",
"query_kwargs": null,
"source": 1,
"created_on": "2021-05-05T06:31:59.960530Z",
"updated_on": "2021-05-05T06:31:59.960565Z"
}
],
"reviews": [],
"is_draft": true,
"user": "Admin",
"created_on": "2021-05-05T06:31:59.952735Z",
"updated_on": "2021-05-19T08:20:31.405966Z",
"aliases": [
{
"id": 44,
"column_name": "country_name",
"column_alias": "Country name"
},
{
"id": 45,
"column_name": "country_code",
"column_alias": "Country code"
},
{
"id": 46,
"column_name": "iso3",
"column_alias": "ISO Alpha 3"
}
],
"alias_creation_status": "d",
"logs": {
"type": "warning",
"steps": [1],
"columns": [
"country_code",
"country_name"
],
"message": "Obsolete Columns"
},
"estimated_run_time": 4000
}
Loading