Source code for spinn_utilities.make_tools.replacer

# Copyright (c) 2018 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import shutil
import struct
import sys
from types import TracebackType
from typing import Optional, Type, Tuple

from typing_extensions import Literal, Self

from spinn_utilities.overrides import overrides
from spinn_utilities.config_holder import get_config_str_or_none
from spinn_utilities.log import FormatAdapter

from .file_converter import FORMAT_EXP
from .file_converter import TOKEN
from .log_sqllite_database import DB_FILE_NAME, LogSqlLiteDatabase

logger = FormatAdapter(logging.getLogger(__name__))

LEVELS = {10: "[DEBUG]",
          20: "[INFO]",
          30: "[WARN]",
          40: "[ERROR]"}


[docs] class Replacer(LogSqlLiteDatabase): """ Performs replacements. """ @overrides(LogSqlLiteDatabase._database_file) def _database_file(self) -> str: database_file = super()._database_file() if not os.path.isfile(database_file): external_binaries = get_config_str_or_none( "Mapping", "external_binaries") if external_binaries is not None: source_file = os.path.join(external_binaries, DB_FILE_NAME) if os.path.exists(source_file): shutil.copyfile(source_file, database_file) return database_file @overrides(LogSqlLiteDatabase._extra_database_error_message) def _extra_database_error_message(self) -> str: extra__binaries = get_config_str_or_none( "Mapping", "external_binaries") if extra__binaries is None: return "" else: return (f"The cfg {extra__binaries=} " f"also does not contain a {DB_FILE_NAME}. ") def __enter__(self) -> Self: return self def __exit__(self, exc_type: Optional[Type], exc_val: Exception, exc_tb: TracebackType) -> Literal[False]: return False _INT_FMT = struct.Struct("!I") _FLT_FMT = struct.Struct("!f") _DBL_FMT = struct.Struct("!d") def _replace(self, short: str) -> Optional[Tuple[int, str, str, str]]: """ Apply the replacements to a short message. :param str short: The short message to apply the transform to. :return: The expanded message. :rtype: str """ parts = short.split(TOKEN) if not parts[0].isdigit(): return None data = self.get_log_info(parts[0]) if data is None: return None (log_level, file_name, line_num, original) = data replaced = original.encode("latin-1").decode("unicode_escape") if len(parts) > 1: matches = FORMAT_EXP.findall(original) # Remove any blanks due to double spacing matches = [x for x in matches if x != ""] # Start at 0 so first i+1 puts you at 1 as part 0 is the short i = 0 try: for match in matches: i += 1 if match.endswith("f"): replacement = str(self._hex_to_float(parts[i])) elif match.endswith("F"): replacement = str(self._hexes_to_double( parts[i], parts[i+1])) i += 1 else: replacement = parts[i] replaced = replaced.replace(match, replacement, 1) except Exception: # pylint: disable=broad-except return None return (log_level, file_name, line_num, replaced)
[docs] def replace(self, short: str) -> str: """ Apply the replacements to a short message. :param str short: :rtype: str """ data = self._replace(short) if data is None: return short (log_level, file_name, line_num, replaced) = data return f"{LEVELS[log_level]} ({file_name}: {line_num}): {replaced}"
def _hex_to_float(self, hex_str: str) -> float: return self._FLT_FMT.unpack( self._INT_FMT.pack(int(hex_str, 16)))[0] def _hexes_to_double(self, upper: str, lower: str) -> float: return self._DBL_FMT.unpack( self._INT_FMT.pack(int(upper, 16)) + self._INT_FMT.pack(int(lower, 16)))[0]
if __name__ == '__main__': encoded = sys.argv[1] LINE = "".join([c if c.isalnum() else TOKEN for c in encoded]) with Replacer() as replacer: print(replacer.replace(LINE))