Commit 928a2797 authored by Flavio TARSETTI's avatar Flavio TARSETTI

Merge branch 'logger_cleanup' into 'master'

Logger cleanup

See merge request !69
parents 1cdad443 fb229608
Pipeline #38028 failed with stages
in 90 minutes
......@@ -415,7 +415,7 @@ class CachedDataSource(DataSource):
) = getAllFilenames(filename, start_index, end_index)
if len(self.filenames) == 0:
logger.warn("No files found for %s" % filename)
logger.warning("No files found for %s" % filename)
return False
check_consistency(self.filenames, data_checksum_filenames)
......@@ -429,7 +429,7 @@ class CachedDataSource(DataSource):
try:
f = open(current_filename, "rb")
except Exception as e:
logger.warn("Could not setup `%s': %s" % (filename, e))
logger.warning("Could not setup `%s': %s" % (filename, e))
return False
# Reads the header of the current file
......
......@@ -42,12 +42,10 @@ data_loaders
This module implements all the data communication related classes
"""
import logging
import six
from .data import mixDataIndices
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
......@@ -101,24 +99,25 @@ class DataView(object):
current_start = self.data_index_start
for i in range(self.data_index_start, self.data_index_end + 1):
for indices in infos['data_indices']:
for indices in infos["data_indices"]:
if indices[1] == i:
input_data_indices.append( (current_start, i) )
input_data_indices.append((current_start, i))
current_start = i + 1
break
if (len(input_data_indices) == 0) or (input_data_indices[-1][1] != self.data_index_end):
input_data_indices.append( (current_start, self.data_index_end) )
if (len(input_data_indices) == 0) or (
input_data_indices[-1][1] != self.data_index_end
):
input_data_indices.append((current_start, self.data_index_end))
self.infos[input_name] = dict(
data_source = infos['data_source'],
data_indices = input_data_indices,
data = None,
start_index = -1,
end_index = -1,
data_source=infos["data_source"],
data_indices=input_data_indices,
data=None,
start_index=-1,
end_index=-1,
)
def count(self, input_name=None):
"""Returns the number of available data indexes for the given input
name. If none given the number of available data units.
......@@ -134,30 +133,30 @@ class DataView(object):
"""
if input_name is not None:
try:
return len(self.infos[input_name]['data_indices'])
except:
return len(self.infos[input_name]["data_indices"])
except Exception:
return None
else:
return self.nb_data_units
def __getitem__(self, index):
if index < 0:
return (None, None, None)
try:
indices = self.data_indices[index]
except:
except Exception:
return (None, None, None)
result = {}
for input_name, infos in self.infos.items():
if (indices[0] < infos['start_index']) or (infos['end_index'] < indices[0]):
(infos['data'], infos['start_index'], infos['end_index']) = \
infos['data_source'].getAtDataIndex(indices[0])
if (indices[0] < infos["start_index"]) or (infos["end_index"] < indices[0]):
(infos["data"], infos["start_index"], infos["end_index"]) = infos[
"data_source"
].getAtDataIndex(indices[0])
result[input_name] = infos['data']
result[input_name] = infos["data"]
return (result, indices[0], indices[1])
......@@ -211,28 +210,27 @@ class DataLoader(object):
self.data_index_start = -1 # Lower index across all inputs
self.data_index_end = -1 # Bigger index across all inputs
def add(self, input_name, data_source):
self.infos[input_name] = dict(
data_source = data_source,
data_indices = data_source.data_indices(),
data = None,
start_index = -1,
end_index = -1,
data_source=data_source,
data_indices=data_source.data_indices(),
data=None,
start_index=-1,
end_index=-1,
)
self.mixed_data_indices = mixDataIndices([ x['data_indices'] for x in self.infos.values() ])
self.mixed_data_indices = mixDataIndices(
[x["data_indices"] for x in self.infos.values()]
)
self.nb_data_units = len(self.mixed_data_indices)
self.data_index_start = self.mixed_data_indices[0][0]
self.data_index_end = self.mixed_data_indices[-1][1]
def input_names(self):
"""Returns the name of all inputs associated to this data loader"""
return self.infos.keys()
def count(self, input_name=None):
"""Returns the number of available data indexes for the given input
name. If none given the number of available data units.
......@@ -249,13 +247,12 @@ class DataLoader(object):
if input_name is not None:
try:
return len(self.infos[input_name]['data_indices'])
except:
return len(self.infos[input_name]["data_indices"])
except Exception:
return 0
else:
return self.nb_data_units
def view(self, input_name, index):
""" Returns the view associated with this data loader
......@@ -272,33 +269,36 @@ class DataLoader(object):
return None
try:
indices = self.infos[input_name]['data_indices'][index]
except:
indices = self.infos[input_name]["data_indices"][index]
except Exception:
return None
limited_data_indices = [ x for x in self.mixed_data_indices
if (indices[0] <= x[0]) and (x[1] <= indices[1]) ]
limited_data_indices = [
x
for x in self.mixed_data_indices
if (indices[0] <= x[0]) and (x[1] <= indices[1])
]
return DataView(self, limited_data_indices)
def __getitem__(self, index):
if index < 0:
return (None, None, None)
try:
indices = self.mixed_data_indices[index]
except:
except Exception:
return (None, None, None)
result = {}
for input_name, infos in self.infos.items():
if (indices[0] < infos['start_index']) or (infos['end_index'] < indices[0]):
(infos['data'], infos['start_index'], infos['end_index']) = \
infos['data_source'].getAtDataIndex(indices[0])
if (indices[0] < infos["start_index"]) or (infos["end_index"] < indices[0]):
(infos["data"], infos["start_index"], infos["end_index"]) = infos[
"data_source"
].getAtDataIndex(indices[0])
result[input_name] = infos['data']
result[input_name] = infos["data"]
return (result, indices[0], indices[1])
......@@ -354,7 +354,6 @@ class DataLoaderList(object):
self._loaders = []
self.main_loader = None
def add(self, data_loader):
"""Add a data loader to the list
......@@ -366,7 +365,6 @@ class DataLoaderList(object):
self._loaders.append(data_loader)
def __getitem__(self, name_or_index):
try:
if isinstance(name_or_index, six.string_types):
......@@ -374,30 +372,24 @@ class DataLoaderList(object):
elif isinstance(name_or_index, int):
return self._loaders[name_or_index]
except:
pass
except Exception:
return None
def __iter__(self):
for i in range(len(self._loaders)):
yield self._loaders[i]
def __len__(self):
return len(self._loaders)
def loaderOf(self, input_name):
"""Returns the data loader matching the input name"""
try:
return [ k for k in self._loaders if input_name in k.input_names() ][0]
except:
return [k for k in self._loaders if input_name in k.input_names()][0]
except Exception:
return None
def secondaries(self):
"""Returns a list of all data loaders except the main one"""
......
......@@ -43,17 +43,11 @@ Execution utilities
"""
import os
import logging
import simplejson
from ..database import Database
logger = logging.getLogger(__name__)
class DBExecutor(object):
"""Executor specialised in database views
......
......@@ -336,7 +336,7 @@ class MessageHandler(threading.Thread):
answer = self.socket.recv() # ack
logger.debug("recv: %s", answer)
break
logger.warn(
logger.warning(
'(try %d) waited %d ms for "ack" from server', this_try, timeout
)
this_try += 1
......
......@@ -42,14 +42,20 @@ inputs
This module implements input related classes
"""
import logging
from functools import reduce
import six
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
def first(iterable, default=None):
"""Get the first item of a list or default
"""
return next(iter(iterable), default)
# ----------------------------------------------------------
......@@ -107,7 +113,6 @@ class Input(object):
self.nb_data_blocks_read = 0
self.data_source = data_source
def isDataUnitDone(self):
"""Indicates if the current data unit will change at the next iteration
"""
......@@ -115,14 +120,12 @@ class Input(object):
if (self.data_index_end >= 0) and (self.group.last_data_index == -1):
return True
return (self.data_index_end == self.group.last_data_index)
return self.data_index_end == self.group.last_data_index
def hasMoreData(self):
"""Indicates if there is more data to process on the input"""
return (self.next_data_index < len(self.data_source))
return self.next_data_index < len(self.data_source)
def hasDataChanged(self):
"""Indicates if the current data unit is different than the one at the
......@@ -131,21 +134,24 @@ class Input(object):
return not self.data_same_as_previous
def next(self):
"""Retrieves the next block of data"""
if self.group.restricted_access:
raise RuntimeError('Not authorized')
raise RuntimeError("Not authorized")
if self.next_data_index >= len(self.data_source):
message = "User algorithm asked for more data for channel " \
"`%s' on input `%s', but it is over (no more data). This " \
"normally indicates a programming error on the user " \
message = (
"User algorithm asked for more data for channel "
"`%s' on input `%s', but it is over (no more data). This "
"normally indicates a programming error on the user "
"side." % (self.group.channel, self.name)
)
raise RuntimeError(message)
(self.data, self.data_index, self.data_index_end) = self.data_source[self.next_data_index]
(self.data, self.data_index, self.data_index_end) = self.data_source[
self.next_data_index
]
self.data_same_as_previous = False
self.next_data_index += 1
......@@ -207,8 +213,7 @@ class InputGroup:
"""
def __init__(self, channel, synchronization_listener=None,
restricted_access=True):
def __init__(self, channel, synchronization_listener=None, restricted_access=True):
self._inputs = []
self.data_index = -1 # Lower index across all inputs
......@@ -219,29 +224,23 @@ class InputGroup:
self.synchronization_listener = synchronization_listener
self.restricted_access = restricted_access
def __getitem__(self, index):
if isinstance(index, six.string_types):
try:
return [x for x in self._inputs if x.name == index][0]
except:
pass
return first([x for x in self._inputs if x.name == index])
elif isinstance(index, int):
if index < len(self._inputs):
return self._inputs[index]
return None
def __iter__(self):
for k in self._inputs: yield k
for k in self._inputs:
yield k
def __len__(self):
return len(self._inputs)
def add(self, input):
def add(self, input_):
"""Add an input to the group
Parameters:
......@@ -250,53 +249,67 @@ class InputGroup:
"""
input.group = self
self._inputs.append(input)
input_.group = self
self._inputs.append(input_)
def hasMoreData(self):
"""Indicates if there is more data to process in the group"""
return bool([x for x in self._inputs if x.hasMoreData()])
def next(self):
"""Retrieve the next block of data on all the inputs"""
# Only for groups not managed by the platform
if self.restricted_access:
raise RuntimeError('Not authorized')
raise RuntimeError("Not authorized")
# Only retrieve new data on the inputs where the current data expire
# first
lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
self._inputs[1:], self._inputs[0].data_index_end)
inputs_to_update = [x for x in self._inputs \
if x.data_index_end == lower_end_index]
lower_end_index = reduce(
lambda x, y: min(x, y.data_index_end),
self._inputs[1:],
self._inputs[0].data_index_end,
)
inputs_to_update = [
x for x in self._inputs if x.data_index_end == lower_end_index
]
inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]
for input in inputs_to_update:
input.next()
for input_ in inputs_to_update:
input_.next()
for input in inputs_up_to_date:
input.data_same_as_previous = True
for input_ in inputs_up_to_date:
input_.data_same_as_previous = True
# Compute the group's start and end indices
self.data_index = reduce(lambda x, y: min(x, y.data_index),
self._inputs[1:], self._inputs[0].data_index)
self.data_index_end = reduce(lambda x, y: max(x, y.data_index_end),
self._inputs[1:], self._inputs[0].data_index_end)
self.first_data_index = reduce(lambda x, y: max(x, y.data_index),
self._inputs[1:], self._inputs[0].data_index)
self.last_data_index = reduce(lambda x, y: min(x, y.data_index_end),
self._inputs[1:], self._inputs[0].data_index_end)
self.data_index = reduce(
lambda x, y: min(x, y.data_index),
self._inputs[1:],
self._inputs[0].data_index,
)
self.data_index_end = reduce(
lambda x, y: max(x, y.data_index_end),
self._inputs[1:],
self._inputs[0].data_index_end,
)
self.first_data_index = reduce(
lambda x, y: max(x, y.data_index),
self._inputs[1:],
self._inputs[0].data_index,
)
self.last_data_index = reduce(
lambda x, y: min(x, y.data_index_end),
self._inputs[1:],
self._inputs[0].data_index_end,
)
# Inform the synchronisation listener
if self.synchronization_listener is not None:
self.synchronization_listener.onIntervalChanged(self.first_data_index,
self.last_data_index)
self.synchronization_listener.onIntervalChanged(
self.first_data_index, self.last_data_index
)
# ----------------------------------------------------------
......@@ -360,7 +373,6 @@ class InputList:
self._groups = []
self.main_group = None
def add(self, group):
"""Add a group to the list
......@@ -371,36 +383,30 @@ class InputList:
self._groups.append(group)
def __getitem__(self, index):
if isinstance(index, six.string_types):
try:
return [k for k in map(lambda x: x[index], self._groups) \
if k is not None][0]
except:
pass
return first(
[k for k in map(lambda x: x[index], self._groups) if k is not None]
)
elif isinstance(index, int):
for group in self._groups:
if index < len(group): return group[index]
if index < len(group):
return group[index]
index -= len(group)
return None
def __iter__(self):
for i in range(len(self)): yield self[i]
for i in range(len(self)):
yield self[i]
def __len__(self):
return reduce(lambda x, y: x + len(y), self._groups, 0)
def nbGroups(self):
"""Returns the number of groups this list belongs to"""
return len(self._groups)
def groupOf(self, input_name):
"""Returns the group which this input_name belongs to
......@@ -408,25 +414,17 @@ class InputList:
:param str input_name: Name of the input for which the group should
be search for.
"""
try:
return [k for k in self._groups if k[input_name] is not None][0]
except:
return None
return first([k for k in self._groups if k[input_name] is not None])
def hasMoreData(self):
"""Indicates if there is more data to process in any group"""
return bool([x for x in self._groups if x.hasMoreData()])
def group(self, name_or_index):
"""Returns the group matching the name or index passed as parameter"""
if isinstance(name_or_index, six.string_types):
try:
return [x for x in self._groups if x.channel == name_or_index][0]
except:
return None
return first([x for x in self._groups if x.channel == name_or_index])
elif isinstance(name_or_index, int):
return self._groups[name_or_index]
else:
......
......@@ -43,12 +43,8 @@ This module implements output related classes
"""
import six
import logging
import zmq
logger = logging.getLogger(__name__)
class SynchronizationListener:
"""A callback mechanism to keep Inputs and Outputs in groups and lists
......
......@@ -95,7 +95,7 @@ def send_error(logger, socket, tp, message):
answer = socket.recv() # ack
logger.debug("recv: %s", answer)
break
logger.warn('(try %d) waited %d ms for "ack" from server', this_try, timeout)
logger.warning('(try %d) waited %d ms for "ack" from server', this_try, timeout)
this_try += 1
if this_try > max_tries:
logger.error("could not send error message to server")
......
......@@ -38,7 +38,6 @@
import os
import socket
import logging
import unittest
import json
......@@ -62,9 +61,6 @@ from .test_database import INTEGERS_DBS
from . import prefix
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
......
......@@ -37,7 +37,6 @@
# Tests for experiment execution
import os
import logging
import unittest
import zmq
import tempfile
......@@ -62,9 +61,6 @@ from . import prefix
# ----------------------------------------------------------
logger = logging.getLogger(__name__)
CONFIGURATION = {
"queue": "queue",
"algorithm": "user/sum/1",
......
This diff is collapsed.
......@@ -37,7 +37,6 @@
# Tests for experiment execution
import os
import logging
import unittest
import zmq
import tempfile
......@@ -63,9 +62,6 @@ from ..helpers import convert_experiment_configuration_to_container
from . import prefix
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
......
......@@ -34,9 +34,6 @@
###################################################################################
import logging
logger = logging.getLogger(__name__)
import unittest
import zmq
import os
......@@ -46,9 +43,6 @@ import numpy as np
from ..execution import MessageHandler
from ..dataformat import DataFormat
from ..inputs import Input
from ..inputs import InputGroup
from ..inputs import InputList
from ..data import RemoteException
from ..data import CachedDataSource
from ..data import RemoteDataSource
......@@ -59,21 +53,19 @@ from .mocks import CrashingDataSource
from . import prefix
#----------------------------------------------------------
# ----------------------------------------------------------
class TestMessageHandlerBase(unittest.TestCase):
def setUp(self):
self.filenames = []
self.data_loader = None
def tearDown(self):
for filename in self.filenames:
basename, ext = os.path.splitext(filename)
filenames = [filename]
filenames += glob.glob(basename + '*')
filenames += glob.glob(basename + "*")
for filename in filenames:
if os.path.exists(filename):
os.unlink(filename)
......@@ -87,32 +79,34 @@ class TestMessageHandlerBase(unittest.TestCase):
self.data_loader = None
def create_data_loader(self, data_sources):
self.client_context = zmq.Context()
self.message_handler = MessageHandler('127.0.0.1', data_sources=data_sources, context=self.client_context)
self.message_handler = MessageHandler(
"127.0.0.1", data_sources=data_sources, context=self.client_context
)
self.message_handler.start()