Source code for spinn_utilities.log
# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
import configparser
from datetime import datetime
import logging
import re
import sys
from typing import (Any, Collection, Dict, KeysView, List, Mapping, Optional,
Tuple)
from inspect import getfullargspec
from spinn_utilities.configs import CamelCaseConfigParser
from .log_store import LogStore
from .overrides import overrides
_LEVELS = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'critical': logging.CRITICAL,
}
[docs]
class ConfiguredFilter(object):
"""
Allow a parent logger to filter a child logger.
"""
__slots__ = [
"_default_level", "_levels"]
def __init__(self, conf: configparser.RawConfigParser):
self._levels = ConfiguredFormatter.construct_logging_parents(conf)
self._default_level = logging.INFO
if conf.has_option("Logging", "default"):
self._default_level = _LEVELS[conf.get("Logging", "default")]
[docs]
def filter(self, record: logging.LogRecord) -> bool:
"""
Get the level for the deepest parent, and filter appropriately.
"""
level = ConfiguredFormatter.level_of_deepest_parent(
self._levels, record.name)
if level is None:
return record.levelno >= self._default_level
return record.levelno >= level
[docs]
class ConfiguredFormatter(logging.Formatter):
"""
Defines the logging format for the SpiNNaker host software.
"""
# Precompile this RE; it gets used quite a few times
__last_component = re.compile(r'\.[^.]+$')
def __init__(self, conf: CamelCaseConfigParser) -> None:
if (conf.has_option("Logging", "default") and
conf.get("Logging", "default") == "debug"):
fmt = "%(asctime)-15s %(levelname)s: %(pathname)s: %(message)s"
else:
fmt = "%(asctime)-15s %(levelname)s: %(message)s"
super().__init__(fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S")
[docs]
@staticmethod
def construct_logging_parents(
conf: configparser.RawConfigParser) -> Dict[str, int]:
"""
Create a dictionary of module names and logging levels.
"""
# Construct the dictionary
_levels: Dict[str, int] = {}
if not conf.has_section("Logging"):
return _levels
for label, level in _LEVELS.items():
if conf.has_option("Logging", label):
modules = [s.strip() for s in
conf.get('Logging', label).split(',')]
if '' not in modules:
_levels.update(dict((m, level) for m in modules))
return _levels
[docs]
@staticmethod
def deepest_parent(parents: KeysView[str], child: str) -> Optional[str]:
"""
Greediest match between child and parent.
"""
# TODO: this can almost certainly be neater!
# Repeatedly strip elements off the child until we match an item in
# parents
match = child
while '.' in match and match not in parents:
match = ConfiguredFormatter.__last_component.sub('', match)
# If no match then return None, there is no deepest parent
if match not in parents:
return None
return match
[docs]
@staticmethod
def level_of_deepest_parent(
parents: Dict[str, int], child: str) -> Optional[int]:
"""
The logging level of the greediest match between child and parent.
"""
# child = re.sub( r'^pacman103\.', '', child )
parent = ConfiguredFormatter.deepest_parent(parents.keys(), child)
if parent is None:
return None
return parents[parent]
class _BraceMessage(object):
"""
A message that converts a Python format string to a string.
"""
__slots__ = [
"args", "fmt", "kwargs"]
def __init__(self, fmt: object,
args: Collection, kwargs: Dict[str, object]) -> None:
self.fmt = fmt
self.args = args
self.kwargs = kwargs
def __str__(self) -> str:
try:
return str(self.fmt).format(*self.args, **self.kwargs)
except KeyError:
try:
if (not self.args and not self.kwargs and
isinstance(self.fmt, str)):
# nothing to format with so assume brackets not formatting
return self.fmt
return "KeyError: " + str(self.fmt)
except KeyError:
return "Double KeyError"
except IndexError:
try:
if self.args or self.kwargs:
return "IndexError: " + str(self.fmt)
else:
# nothing to format with so assume brackets not formatting
return str(self.fmt)
except IndexError:
return "Double IndexError"
[docs]
class LogLevelTooHighException(Exception):
"""
An Exception throw when the System tries to log at a level where an
Exception is a better option.
"""
[docs]
class FormatAdapter(logging.LoggerAdapter):
"""
An adaptor for logging with messages that uses Python format strings.
Example::
log = FormatAdapter(logging.getLogger(__name__))
log.info("this message has {} inside {}", 123, 'itself')
# --> INFO: this message has 123 inside itself
"""
__kill_level = logging.CRITICAL + 1
__repeat_at_end = logging.WARNING
__not_stored_messages: List[Tuple[datetime, int, str]] = []
__log_store: Optional[LogStore] = None
[docs]
@classmethod
def set_kill_level(cls, level: Optional[int] = None) -> None:
"""
Allow system to change the level at which a log is changed to an
Exception.
.. note::
This is a static method; it affects all log messages.
:param int level:
The level to set. The values in :py:mod:`logging` are recommended.
"""
if level is None:
cls.__kill_level = logging.CRITICAL + 1
else:
cls.__kill_level = level
[docs]
@classmethod
def set_log_store(cls, log_store: Optional[LogStore]) -> None:
"""
Sets a Object to write the log messages to
:param LogStore log_store:
"""
if log_store is not None and not isinstance(log_store, LogStore):
raise TypeError("log_store must be a LogStore")
cls.__log_store = log_store
if cls.__log_store:
for timestamp, level, message in cls._pop_not_stored_messages():
cls.__log_store.store_log(level, message, timestamp)
def __init__(
self, logger: logging.Logger,
extra: Optional[Mapping[str, object]] = None) -> None:
if extra is None:
extra = {}
super().__init__(logger, extra)
self.do_log = logger._log # pylint: disable=protected-access
[docs]
@overrides(logging.LoggerAdapter.log, extend_doc=False, adds_typing=True)
def log(self, level: int, msg: object,
*args: object, **kwargs: object) -> None:
"""
Delegate a log call to the underlying logger, applying appropriate
transformations to allow the log message to be written using
Python format string, rather than via `%`-substitutions.
"""
if level >= FormatAdapter.__kill_level:
raise LogLevelTooHighException(_BraceMessage(msg, args, kwargs))
if self.isEnabledFor(level):
message = _BraceMessage(msg, args, kwargs)
if FormatAdapter.__log_store:
try:
FormatAdapter.__log_store.store_log(level, str(message))
except Exception as ex:
# Avoid an endless loop of log store errors being logged
FormatAdapter.__not_stored_messages.append((
datetime.now(),
level,
f"Unable to store log messages in database due to"
f" {ex}"))
FormatAdapter.__not_stored_messages.append(
(datetime.now(), level, str(message)))
FormatAdapter.__log_store = None
raise
else:
FormatAdapter.__not_stored_messages.append(
(datetime.now(), level, str(message)))
msg, log_kwargs = self.process(msg, kwargs)
if "exc_info" in kwargs:
log_kwargs["exc_info"] = kwargs["exc_info"]
self.do_log(level, message, (), **log_kwargs)
[docs]
@overrides(logging.LoggerAdapter.process, extend_doc=False,
adds_typing=True)
def process(self, msg: object, kwargs: Any) -> Tuple[object, dict]:
"""
Process the logging message and keyword arguments passed in to a
logging call to insert contextual information. You can either
manipulate the message itself, the keyword arguments or both.
Return the message and *kwargs* modified (or not) to suit your needs.
"""
return msg, {
key: kwargs[key]
for key in getfullargspec(self.do_log).args[1:]
if key in kwargs}
[docs]
@classmethod
def atexit_handler(cls) -> None:
"""
Adds code to print out high level log messages python run ends
"""
messages = []
if cls.__log_store:
try:
messages = cls.__log_store.retreive_log_messages(
cls.__repeat_at_end)
except Exception: # pylint: disable=broad-except
pass
messages.extend(map(lambda x: x[2],
cls._pop_not_stored_messages(cls.__repeat_at_end)))
if messages:
level = logging.getLevelName(cls.__repeat_at_end)
print(f"\n!WARNING: {len(messages)} log messages were "
f"generated at level {level} or above.", file=sys.stderr)
print("This may mean that the results are invalid.",
file=sys.stderr)
if cls.__log_store:
print(f"You are advised to check the details of these in "
f"the p_log_view of : "
f"{cls.__log_store.get_location()}", file=sys.stderr)
if len(messages) < 10:
print("These are:", file=sys.stderr)
else:
print("The first 10 are:", file=sys.stderr)
for message in messages[0:10]:
print(message, file=sys.stderr)
@classmethod
def _pop_not_stored_messages(
cls, min_level: int = 0) -> List[Tuple[datetime, int, str]]:
"""
Returns the log of messages to print on exit and
*clears that log*.
.. note::
Should only be called externally from test code!
"""
result: List[Tuple[datetime, int, str]] = []
try:
for timestamp, level, message in cls.__not_stored_messages:
if level >= min_level:
result.append((timestamp, level, message))
return result
except Exception: # pylint: disable=broad-except
return result
finally:
cls.__not_stored_messages = []
atexit.register(FormatAdapter.atexit_handler)