Reduce mirror status query madness
Move completely to custom SQL for this logic. The Django ORM just doesn't play nice with the kind of query we are looking to do, so it is easier to do using raw SQL. The biggest pain factor here is in supporting sqlite as it doesn't have nearly the capabilities in handling datetime types directly in the database, as well as having some different type conversion necessities. Signed-off-by: Dan McGee <dan@archlinux.org>
This commit is contained in:
parent
7fc8da7d95
commit
213aa3a2fa
146
mirrors/utils.py
146
mirrors/utils.py
@ -2,6 +2,7 @@
|
||||
|
||||
from django.db import connection
|
||||
from django.db.models import Avg, Count, Max, Min, StdDev
|
||||
from django.utils.dateparse import parse_datetime
|
||||
from django.utils.timezone import now
|
||||
from django_countries.fields import Country
|
||||
|
||||
@ -11,12 +12,94 @@
|
||||
|
||||
DEFAULT_CUTOFF = timedelta(hours=24)
|
||||
|
||||
def annotate_url(url, delay):
|
||||
|
||||
def dictfetchall(cursor):
|
||||
"Returns all rows from a cursor as a dict."
|
||||
desc = cursor.description
|
||||
return [
|
||||
dict(zip([col[0] for col in desc], row))
|
||||
for row in cursor.fetchall()
|
||||
]
|
||||
|
||||
|
||||
def status_data(cutoff_time, mirror_id=None):
|
||||
if mirror_id is not None:
|
||||
params = [cutoff_time, mirror_id]
|
||||
mirror_where = 'AND u.mirror_id = %s'
|
||||
else:
|
||||
params = [cutoff_time]
|
||||
mirror_where = ''
|
||||
|
||||
vendor = database_vendor(MirrorUrl)
|
||||
if vendor == 'sqlite':
|
||||
sql = """
|
||||
SELECT l.url_id, u.mirror_id,
|
||||
COUNT(l.id) AS check_count,
|
||||
COUNT(l.duration) AS success_count,
|
||||
MAX(l.last_sync) AS last_sync,
|
||||
MAX(l.check_time) AS last_check,
|
||||
AVG(l.duration) AS duration_avg,
|
||||
0.0 AS duration_stddev,
|
||||
AVG(STRFTIME('%%s', check_time) - STRFTIME('%%s', last_sync)) AS delay
|
||||
FROM mirrors_mirrorlog l
|
||||
JOIN mirrors_mirrorurl u ON u.id = l.url_id
|
||||
WHERE l.check_time >= %s
|
||||
""" + mirror_where + """
|
||||
GROUP BY l.url_id, u.mirror_id
|
||||
"""
|
||||
else:
|
||||
sql = """
|
||||
SELECT l.url_id, u.mirror_id,
|
||||
COUNT(l.id) AS check_count,
|
||||
COUNT(l.duration) AS success_count,
|
||||
MAX(l.last_sync) AS last_sync,
|
||||
MAX(l.check_time) AS last_check,
|
||||
AVG(l.duration) AS duration_avg,
|
||||
STDDEV(l.duration) AS duration_stddev,
|
||||
AVG(check_time - last_sync) AS delay
|
||||
FROM mirrors_mirrorlog l
|
||||
JOIN mirrors_mirrorurl u ON u.id = l.url_id
|
||||
WHERE l.check_time >= %s
|
||||
""" + mirror_where + """
|
||||
GROUP BY l.url_id, u.mirror_id
|
||||
"""
|
||||
|
||||
cursor = connection.cursor()
|
||||
cursor.execute(sql, params)
|
||||
url_data = dictfetchall(cursor)
|
||||
|
||||
# sqlite loves to return less than ideal types
|
||||
if vendor == 'sqlite':
|
||||
for item in url_data:
|
||||
item['delay'] = timedelta(seconds=item['delay'])
|
||||
item['last_sync'] = parse_datetime(item['last_sync'])
|
||||
item['last_check'] = parse_datetime(item['last_check'])
|
||||
|
||||
return {item['url_id']: item for item in url_data}
|
||||
|
||||
|
||||
def annotate_url(url, url_data):
|
||||
'''Given a MirrorURL object, add a few more attributes to it regarding
|
||||
status, including completion_pct, delay, and score.'''
|
||||
known_attrs = (
|
||||
('success_count', 0),
|
||||
('check_count', 0),
|
||||
('completion_pct', None),
|
||||
('last_check', None),
|
||||
('last_sync', None),
|
||||
('delay', None),
|
||||
('score', None),
|
||||
)
|
||||
for k, v in known_attrs:
|
||||
setattr(url, k, v)
|
||||
for k, v in url_data.items():
|
||||
if k not in ('url_id', 'mirror_id'):
|
||||
setattr(url, k, v)
|
||||
|
||||
if url.check_count > 0:
|
||||
url.completion_pct = float(url.success_count) / url.check_count
|
||||
if delay is not None:
|
||||
url.delay = delay
|
||||
|
||||
if url.delay is not None:
|
||||
hours = url.delay.days * 24.0 + url.delay.seconds / 3600.0
|
||||
|
||||
if url.completion_pct > 0:
|
||||
@ -24,34 +107,8 @@ def annotate_url(url, delay):
|
||||
else:
|
||||
# arbitrary small value
|
||||
divisor = 0.005
|
||||
url.score = (hours + url.duration_avg + url.duration_stddev) / divisor
|
||||
else:
|
||||
url.delay = None
|
||||
url.score = None
|
||||
|
||||
|
||||
def url_delays(cutoff_time, mirror_id=None):
|
||||
cursor = connection.cursor()
|
||||
if mirror_id is None:
|
||||
sql= """
|
||||
SELECT url_id, AVG(check_time - last_sync)
|
||||
FROM mirrors_mirrorlog
|
||||
WHERE is_success = %s AND check_time >= %s AND last_sync IS NOT NULL
|
||||
GROUP BY url_id
|
||||
"""
|
||||
cursor.execute(sql, [True, cutoff_time])
|
||||
else:
|
||||
sql = """
|
||||
SELECT l.url_id, avg(check_time - last_sync)
|
||||
FROM mirrors_mirrorlog l
|
||||
JOIN mirrors_mirrorurl u ON u.id = l.url_id
|
||||
WHERE is_success = %s AND check_time >= %s AND last_sync IS NOT NULL
|
||||
AND mirror_id = %s
|
||||
GROUP BY url_id
|
||||
"""
|
||||
cursor.execute(sql, [True, cutoff_time, mirror_id])
|
||||
|
||||
return {url_id: delay for url_id, delay in cursor.fetchall()}
|
||||
stddev = url.duration_stddev or 0.0
|
||||
url.score = (hours + url.duration_avg + stddev) / divisor
|
||||
|
||||
|
||||
@cache_function(123)
|
||||
@ -65,29 +122,14 @@ def get_mirror_statuses(cutoff=DEFAULT_CUTOFF, mirror_id=None):
|
||||
if mirror_id:
|
||||
valid_urls = valid_urls.filter(mirror_id=mirror_id)
|
||||
|
||||
url_data = MirrorUrl.objects.values('id', 'mirror_id').filter(
|
||||
id__in=valid_urls, logs__check_time__gte=cutoff_time).annotate(
|
||||
check_count=Count('logs'),
|
||||
success_count=Count('logs__duration'),
|
||||
last_sync=Max('logs__last_sync'),
|
||||
last_check=Max('logs__check_time'),
|
||||
duration_avg=Avg('logs__duration'))
|
||||
|
||||
vendor = database_vendor(MirrorUrl)
|
||||
if vendor != 'sqlite':
|
||||
url_data = url_data.annotate(duration_stddev=StdDev('logs__duration'))
|
||||
|
||||
url_data = status_data(cutoff_time, mirror_id)
|
||||
urls = MirrorUrl.objects.select_related('mirror', 'protocol').filter(
|
||||
id__in=valid_urls).order_by('mirror__id', 'url')
|
||||
delays = url_delays(cutoff_time, mirror_id)
|
||||
|
||||
if urls:
|
||||
url_data = dict((item['id'], item) for item in url_data)
|
||||
for url in urls:
|
||||
for k, v in url_data.get(url.id, {}).items():
|
||||
if k not in ('id', 'mirror_id'):
|
||||
setattr(url, k, v)
|
||||
last_check = max([u.last_check for u in urls])
|
||||
annotate_url(url, url_data.get(url.id, {}))
|
||||
last_check = max([u.last_check for u in urls if u.last_check])
|
||||
num_checks = max([u.check_count for u in urls])
|
||||
check_info = MirrorLog.objects.filter(check_time__gte=cutoff_time)
|
||||
if mirror_id:
|
||||
@ -104,12 +146,6 @@ def get_mirror_statuses(cutoff=DEFAULT_CUTOFF, mirror_id=None):
|
||||
num_checks = 0
|
||||
check_frequency = None
|
||||
|
||||
for url in urls:
|
||||
# fake the standard deviation for local testing setups
|
||||
if vendor == 'sqlite':
|
||||
setattr(url, 'duration_stddev', 0.0)
|
||||
annotate_url(url, delays.get(url.id, None))
|
||||
|
||||
return {
|
||||
'cutoff': cutoff,
|
||||
'last_check': last_check,
|
||||
|
Loading…
Reference in New Issue
Block a user