Source code for spinn_utilities.make_tools.file_converter

# Copyright (c) 2018 The University of Manchester
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import re
import sys

TOKEN = chr(30)  # Record Separator

COMMA_SPLIITER = re.compile(r'(?!\B"[^"]*),(?![^"]*"\B)')
STRING_REGEXP = re.compile(r'"([^"]|\\"|(""))*"')
FORMAT_EXP = re.compile(r"(%+\d*(?:\.\d+)?[cdfiksuxRF])")
LOG_END_REGEX = re.compile(r'\)(\s)*;')
END_COMMENT_REGEX = re.compile(r"/*/")
LOG_START_REGEX = re.compile(
    r"log_((info)|(error)|(debug)|(warning))(\s)*\(")
DOUBLE_HEX = ", double_to_upper({0}), double_to_lower({0})"

# Status values
NORMAL_CODE = 0
COMMENT = NORMAL_CODE + 1
IN_LOG = COMMENT + 1
IN_LOG_CLOSE_BRACKET = IN_LOG + 1

MINIS = {"log_info(": "log_mini_info(",
         "log_error(": "log_mini_error(",
         "log_debug(": "log_mini_debug(",
         "log_warning(": "log_mini_warning("}

LEVELS = {"log_info(": "[INFO]",
          "log_error(": "[ERROR]",
          "log_debug(": "[DEBUG]",
          "log_warning(": "[WARNING]"}

MAX_LOG_PER_FILE = 100


[docs]class FileConverter(object): __slots__ = [ # Full destination directory "_dest", # File to hold dictionary mappings "_dict", # original c log method found # variable created each time a log method found "_log", # Log methods found so far # variable created each time a log method found "_log_full", # Number of c lines the log method takes # variable created each time a log method found "_log_lines", # Any other stuff found before the log method but on same line # variable created each time a log method found "_log_start", # Id for next message "_message_id", # The previous state # variable created when a comment found "_previous_status", # Full source directory "_src", # Current status of state machine "_status", # Number of extra lines written to modified not yet recovered # Extra lines are caused by the header and possibly log comment # Extra lines are recovered by omitting blank lines "_too_many_lines" ] def __init__(self, src, dest, dict_file): """ Creates the file_convertor to convert one file :param src: Full source directory :type src: str :param dest: Full destination directory :type dest: str :param dict_file: File to hold dictionary mappings :type dict_file: str """ self._src = os.path.abspath(src) self._dest = os.path.abspath(dest) self._dict = dict_file self._log = None self._log_full = None self._log_lines = None self._log_start = None self._message_id = None self._previous_status = None self._status = None self._too_many_lines = None def _run(self, range_start): """ Runs the file converter WARNING. This code is absolutely not thread safe. Interwoven calls even on different FileConverter objects is dangerous! It is highly likely that dict files become corrupted and the same message_id is used multiple times. :param range_start: id of last dictionary key used :type range_start: int :return: The last message id use which can in turn be passed into the next FileConverter """ self._message_id = range_start if not os.path.exists(self._src): raise Exception("Unable to locate source {}".format(self._src)) dest_dir = os.path.dirname(os.path.realpath(self._dest)) if not os.path.exists(dest_dir): os.makedirs(dest_dir) with open(self._src) as src_f: with open(self._dest, 'w') as dest_f: dest_f.write( "// DO NOT EDIT! THIS FILE WAS GENERATED FROM {}\n\n" .format(self.unique_src())) self._too_many_lines = 2 self._status = NORMAL_CODE for line_num, text in enumerate(src_f): if self._too_many_lines > 0: # Try to recover the lines added by do not edit check = text.strip() if len(check) == 0 or check == "*": self._too_many_lines -= 1 continue previous_status = self._status if not self._process_line(dest_f, line_num, text): self._status = previous_status self._process_chars(dest_f, line_num, text) # print (self._dest) return self._message_id def _process_line(self, dest_f, line_num, text): """ Process a single line :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ if self._status == COMMENT: return self._process_line_in_comment(dest_f, text) if "/*" in text: return self._process_line_comment_start(dest_f, line_num, text) if self._status == IN_LOG: return self._process_line_in_log(dest_f, line_num, text) if self._status == IN_LOG_CLOSE_BRACKET: return self._process_line_in_log_close_bracket( dest_f, line_num, text) assert self._status == NORMAL_CODE return self._process_line_normal_code(dest_f, line_num, text) def _process_line_in_comment(self, dest_f, text): """ Process a single line when in a multi line comment /* .. */ :param dest_f: Open file like Object to write modified source to :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ if "*/" in text: stripped = text.strip() match = END_COMMENT_REGEX.search(stripped) if match.end(0) == len(stripped): # OK Comment until end of line dest_f.write(text) self._status = NORMAL_CODE return True return False # Stuff after comment so check by character # Whole line in comment without end dest_f.write(text) return True def _process_line_comment_start(self, dest_f, line_num, text): """ Processes a line known assumed to contain a /* but not know where There is also the assumption that the start status is not COMMENT :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ stripped = text.strip() if stripped.startswith("/*"): self._previous_status = self._status self._status = COMMENT # Comment start so now check for comment end return self._process_line(dest_f, line_num, text) # Stuff before comment so check by char return False # More than one possible end so check by char def _process_line_in_log(self, dest_f, line_num, text): """ Process a line when the status is a log call has been started :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ stripped = text.strip() if stripped.startswith("//"): # Just a comment line so write and move on dest_f.write(text) return True match = LOG_END_REGEX.search(stripped) if not match: if stripped[-1:] == ")": # possible start of end self._status = IN_LOG_CLOSE_BRACKET self._log_full += stripped self._log_lines += 1 return True if match.end(0) < len(stripped): # Stuff after the log_end so check by char return False self._log_lines += 1 self._log_full += stripped self._write_log_method(dest_f, line_num) self._status = NORMAL_CODE return True def _process_line_in_log_close_bracket(self, dest_f, line_num, text): """ Process where the last log line has the ) but not the ; :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ stripped = text.strip() if len(stripped) == 0: self._log_lines += 1 return True if stripped[0] == ";": if stripped == ";": self._log_full += (";") self._log_lines += 1 self._write_log_method(dest_f, line_num) self._status = NORMAL_CODE return True else: return False elif stripped.startswith("//"): # Just a comment line so write and move on dest_f.write(text) return True else: # so not a closing bracket so set status back self._status = IN_LOG return self._process_line_in_log(dest_f, line_num, text) def _process_line_normal_code(self, dest_f, line_num, text): """ Process a line where the status is normal code :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace :return: True if and only if the whole line was processed """ stripped = text.strip() match = LOG_START_REGEX.search(stripped) if not match: # No log start found after all dest_f.write(text) return True if match.start() > 0: if stripped.startswith("//"): # Just a comment line so write and move on dest_f.write(text) return True # Stuff before the log_start so check by character return False if LOG_START_REGEX.search(stripped, 1): # Second start found so check by character return False # remove white spaces and save log command self._log_start = text.index(match.group(0)) self._log = "".join(match.group(0).split()) start_len = self._log_start + len(self._log) self._status = IN_LOG self._log_full = "" # text saved in process_line_in_log self._log_lines = 0 # Now check for the end of log command return self._process_line_in_log(dest_f, line_num, text[start_len:])
[docs] def quote_part(self, text): return (text.count('"') - text.count('\\"')) % 2 > 0
[docs] def bracket_count(self, text): return (text.count('(') - text.count(')'))
[docs] def split_by_comma_plus(self, main, line_num): try: parts = main.split(",") for i, part in enumerate(parts): check = part.strip() if check[0] == '"': # Dealing with a String if check[-1] == '"': if check[-2] != '\\': # Part is a full sting fine continue # Save start of String and get next part new_part = parts.pop(i) next_part = parts.pop(i) new_check = next_part.strip() while new_check[-1] != '"' or new_check[-2] == '\\': # Still not end of String so add and get next new_part += "," + next_part next_part = parts.pop(i) new_check = next_part.strip() # Add the end and put back new in the list new_part += "," + next_part parts.insert(i, new_part) else: # Not a String so look for function count = self.bracket_count(part) if (count > 0): # More opening and closing brackets so in function new_part = parts.pop(i) # Keep combining parts until you find the last closing while count > 0: next_part = parts.pop(i) count += self.bracket_count(next_part) new_part += "," + next_part # Put the new part back into the list parts.insert(i, new_part) if parts[0][0] == '"' and parts[0][-1] == '"': parts[0] = parts[0][1:-1] return parts except Exception: raise Exception("Unexpected line {} at {} in {}".format( self._log_full, line_num, self._src))
def _short_log(self, line_num): """ shortens the log string message and adds the id Assumes that self._message_id has already been updated :param original: Source log messages :return: new log message and the id """ try: match = LOG_END_REGEX.search(self._log_full) main = self._log_full[:-len(match.group(0))] except Exception: raise Exception("Unexpected line {} at {} in {}".format( self._log_full, line_num, self._src)) parts = self.split_by_comma_plus(main, line_num) original = parts[0] count = original.count("%") - original.count("%%") * 2 if count == 0: return original, '"%u", {});'.format(self._message_id) front = '"%u' back = "" matches = [x for x in FORMAT_EXP.findall(original) if not x.startswith("%%")] if len(matches) != count: raise Exception( "Unexpected formatString in {}".format(original)) if len(parts) < count + 1: raise Exception( "Too few parameters in line {} at {} in {}".format( self._log_full, line_num, self._src)) if len(parts) > count + 1: raise Exception( "Too many parameters in line {} at {} in {}".format( self._log_full, line_num, self._src)) for i, match in enumerate(matches): front += TOKEN if match.endswith("f"): front += "%x" elif match.endswith("F"): front += "%x" + TOKEN + "%x" else: front += match if match.endswith("f"): back += ", float_to_int({})".format(parts[i + 1]) elif match.endswith("F"): back += DOUBLE_HEX.format(parts[i + 1]) else: back += ", {}".format(parts[i+1]) front += '", {}'.format(self._message_id) back += ");" return original, front + back def _write_log_method(self, dest_f, line_num, tail=""): """ Writes the log message and the dict value Writes the log call to the destination - New log method used - Shortened log message (with just an id) used - Parameters kept as is - Old log message with full text added as comment Adds the data to the dict file including - key/id - log level - file name - line number - original message :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace """ self._message_id += 1 self._log_full = self._log_full.replace('""', '') original, short_log = self._short_log(line_num) dest_f.write(" " * self._log_start) dest_f.write(MINIS[self._log]) dest_f.write(short_log) if self._log_lines == 0: # Writing an extra newline here so need to recover that ASAP self._too_many_lines += 1 end = tail + "\n" if self._log_lines <= 1: dest_f.write(" /* ") dest_f.write(self._log) dest_f.write(self._log_full) dest_f.write("*/") dest_f.write(end) else: dest_f.write(tail) dest_f.write(end) dest_f.write(" " * self._log_start) dest_f.write("/* ") dest_f.write(self._log) dest_f.write(self._log_full) dest_f.write("*/") dest_f.write(end * (self._log_lines - 1)) with open(self._dict, 'a') as mess_f: # Remove commas from filenames for csv format # Remove start and end quotes from original mess_f.write("{},{} ({}: {}): ,{}\n".format( self._message_id, LEVELS[self._log], os.path.basename(self._src).replace(",", ";"), line_num + 1, original)) def _process_chars(self, dest_f, line_num, text): """ Deals with complex lines that can not be handled in one go :param dest_f: Open file like Object to write modified source to :param line_num: Line number in the source c file :param text: Text of that line including whitespace """ pos = 0 write_flag = 0 while text[pos] != "\n": if self._status == COMMENT: if text[pos] == "*" and text[pos+1] == "/": dest_f.write(text[write_flag:pos + 2]) pos = pos + 2 write_flag = pos self._status = self._previous_status else: pos = pos + 1 elif text[pos] == "/": if text[pos+1] == "*": if self._status == IN_LOG: self._log_full += text[write_flag:pos].strip() if self._log_full[-1] == ")": self._status = IN_LOG_CLOSE_BRACKET # NO change to self._log_lines as newline not removed else: dest_f.write(text[write_flag:pos]) write_flag = pos pos = pos + 2 # leave the /* as not written self._previous_status = self._status self._status = COMMENT elif text[pos+1] == "/": if self._status == IN_LOG: self._log_full += text[write_flag:pos].strip() # NO change to self._log_lines as newline not removed dest_f.write(text[pos:]) else: dest_f.write(text[write_flag:]) return # Finished line else: pos += 1 elif text[pos] == '"': str_pos = pos + 1 while text[str_pos] != '"': if text[str_pos] == "\n": raise Exception( "Unclosed string literal in {} at line: {}". format(self._src, line_num)) elif text[str_pos] == "\\": if text[str_pos+1] == "\n": raise Exception( "Unclosed string literal in {} at line: {}". format(self._src, line_num)) else: str_pos += 2 # ignore next char which may be a " else: str_pos += 1 pos = str_pos + 1 continue elif self._status == IN_LOG: if text[pos] == ")": match = LOG_END_REGEX.match(text[pos:]) if match: # include the end pos = pos + len(match.group(0)) self._log_full += text[write_flag:pos].strip() self._status = NORMAL_CODE if text[pos:].strip(): # Stuff left write_flag = pos # self._log_lines not changed as no newline # check for a \ after the log if text[pos:].strip() == "\\": self._write_log_method(dest_f, line_num, "\\") return else: self._write_log_method(dest_f, line_num) else: self._log_lines += 1 self._write_log_method(dest_f, line_num) return # Finished line else: # not the require ); so continue pos += 1 else: pos += 1 elif self._status == IN_LOG_CLOSE_BRACKET: stripped = text.strip() if stripped[0] == ";": self._log_full += (";") self._write_log_method(dest_f, line_num) pos = text.index(";") + 1 write_flag = pos self._status = NORMAL_CODE else: # Save the ) as not part of the end self._status = IN_LOG elif text[pos] == "l": match = LOG_START_REGEX.match(text[pos:]) if match: self._log_start = text.index(match.group(0)) self._log = "".join(match.group(0).split()) self._status = IN_LOG self._log_full = "" # text saved after while self._log_lines = 0 dest_f.write(text[write_flag:pos]) # written up to not including log_start # skip to end of log instruction pos = pos + len(match.group(0)) write_flag = pos else: # Not a log start after all so treat as normal test pos += 1 else: pos += 1 # after while text[pos] != "\n" if self._status == IN_LOG: self._log_full += text[write_flag:].strip() self._log_lines += 1 else: dest_f.write(text[write_flag:])
[docs] def unique_src(self): """ Returns the part of the source path which is different For example assuming a source of /spinnaker/sPyNNaker/neural_modelling/src/common/in_spikes.h /spinnaker/sPyNNaker/neural_modelling/modified_src/common/in_spikes.h returns src/common/in_spikes.h :return: A pointer to the source relative to the destination """ pos = 0 last_sep = 0 while pos < len(self._src) and pos < len(self._dest) \ and self._src[pos] == self._dest[pos]: if self._src[pos] == os.path.sep: last_sep = pos + 1 pos += 1 return self._src[last_sep:]
[docs] @staticmethod def convert(src, dest, dict_file, range_start): """ Static method to create Object and do the conversion :param src: Full source directory :type src: str :param dest: Full destination directory :type dest: str :param dict_file: File to hold dictionary mappings :type dict_file: str :param range_start: :param range_start: id of last dictionary key used :type range_start: int :return: The last message id use which can in turn be passed into """ converter = FileConverter(src, dest, dict_file) return converter._run(range_start) # pylint: disable=protected-access
if __name__ == '__main__': _src = sys.argv[1] _dest = sys.argv[2] _dict_file = sys.argv[3] _range_start = int(sys.argv[4]) FileConverter.convert(_src, _dest, _dict_file, _range_start)