Source code for spinn_utilities.make_tools.log_sqllite_database

# Copyright (c) 2017 The University of Manchester
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sqlite3
import sys
import time
from typing import Optional, Tuple
from spinn_utilities.abstract_context_manager import AbstractContextManager

_DDL_FILE = os.path.join(os.path.dirname(__file__), "db.sql")
_SECONDS_TO_MICRO_SECONDS_CONVERSION = 1000
DB_FILE_NAME = "logs.sqlite3"


def _timestamp():
    return int(time.time() * _SECONDS_TO_MICRO_SECONDS_CONVERSION)


[docs] class LogSqlLiteDatabase(AbstractContextManager): """ Specific implementation of the Database for SQLite 3. .. note:: **Not thread-safe on the same database.** Threads can access different DBs just fine. .. note:: This totally relies on the way SQLite's type affinities function. You can't port to a different database engine without a lot of work. """ __slots__ = [ # the database holding the data to store "_db", ] def __init__(self, new_dict=False): """ Connects to a log dict. The location of the file can be overridden using the ``C_LOGS_DICT`` environment variable. :param bool new_dict: Flag to say if this is a new dict or not. If True, clears and previous values. If False, makes sure the dict exists. """ # To Avoid an Attribute error on close after an exception self._db = None database_file = os.environ.get('C_LOGS_DICT', None) if database_file is None: script = sys.modules[self.__module__].__file__ directory = os.path.dirname(script) database_file = os.path.join(directory, DB_FILE_NAME) if not new_dict and not os.path.exists(database_file): message = f"Unable to locate c_logs_dict at {database_file}. " if 'C_LOGS_DICT' in os.environ: message += ( "This came from the environment variable 'C_LOGS_DICT'. ") message += "Please rebuild the C code." raise FileNotFoundError(message) try: self._db = sqlite3.connect(database_file) self.__init_db() if new_dict: self.__clear_db() except Exception as ex: message = f"Error accessing c_logs_dict at {database_file}. " if 'C_LOGS_DICT' in os.environ: message += ( "This came from the environment variable 'C_LOGS_DICT'. ") else: message += ( "This is the default location. Set environment " "variable 'C_LOGS_DICT' to use somewhere else.") if new_dict: message += "Check this is a location with write access." else: message += "Please rebuild the C code." raise FileNotFoundError(message) from ex def __del__(self): self.close()
[docs] def close(self): """ Finalises and closes the database. """ try: if self._db is not None: self._db.close() except Exception: # pylint: disable=broad-except pass self._db = None
def __init_db(self): """ Set up the database if required. """ self._db.row_factory = sqlite3.Row # Don't use memoryview / buffer as hard to deal with difference self._db.text_factory = str with open(_DDL_FILE, encoding="utf-8") as f: sql = f.read() self._db.executescript(sql) def __clear_db(self): with self._db: cursor = self._db.cursor() cursor.execute("DELETE FROM log") cursor.execute("UPDATE SQLITE_SEQUENCE SET SEQ=0 WHERE NAME='log'") cursor.execute("DELETE FROM file") cursor.execute( "UPDATE SQLITE_SEQUENCE SET SEQ=0 WHERE NAME='file'") cursor.execute("DELETE FROM directory") cursor.execute( "UPDATE SQLITE_SEQUENCE SET SEQ=0 WHERE NAME='directory'")
[docs] def get_directory_id(self, src_path: str, dest_path: str) -> int: """ gets the Ids for this directory. Making a new one if needed :param str src_path: :param str dest_path: :rtype: int """ with self._db: cursor = self._db.cursor() # reuse the existing if it exists for row in self._db.execute( """ SELECT directory_id FROM directory WHERE src_path = ? AND dest_path = ? LIMIT 1 """, [src_path, dest_path]): return row["directory_id"] # create a new number cursor.execute( """ INSERT INTO directory(src_path, dest_path) VALUES(?, ?) """, (src_path, dest_path)) return cursor.lastrowid
[docs] def get_file_id(self, directory_id: int, file_name: str) -> int: """ Gets the id for this file, making a new one if needed. :param int directory_id: :param str file_name: :rtype: int """ with self._db: # Make previous one as not last with self._db: cursor = self._db.cursor() cursor.execute( """ UPDATE file SET last_build = 0 WHERE directory_id = ? AND file_name = ? """, [directory_id, file_name]) # always create new one to distinguish new from old logs cursor.execute( """ INSERT INTO file( directory_id, file_name, convert_time, last_build) VALUES(?, ?, ?, 1) """, (directory_id, file_name, _timestamp())) return cursor.lastrowid
[docs] def set_log_info( self, log_level: int, line_num: int, original: str, file_id: int): """ Saves the data needed to replace a short log back to the original. :param int log_level: :param int line_num: :param str original: :param int file_id: """ with self._db: cursor = self._db.cursor() # reuse the existing number if nothing has changed cursor.execute( """ UPDATE log SET file_id = ? WHERE log_level = ? AND line_num = ? AND original = ? """, (file_id, log_level, line_num, original)) if cursor.rowcount == 0: # create a new number if anything has changed cursor.execute( """ INSERT INTO log(log_level, line_num, original, file_id) VALUES(?, ?, ?, ?) """, (log_level, line_num, original, file_id)) return cursor.lastrowid else: for row in self._db.execute( """ SELECT log_id FROM log WHERE log_level = ? AND line_num = ? AND original = ? AND file_id = ? LIMIT 1 """, (log_level, line_num, original, file_id)): return row["log_id"]
[docs] def get_log_info(self, log_id: str) -> Optional[Tuple[int, str, int, str]]: """ Gets the data needed to replace a short log back to the original. :param str log_id: The int id as a String :rtype: tuple(int, str, int, str) """ with self._db: for row in self._db.execute( """ SELECT log_level, file_name, line_num , original FROM current_file_view WHERE log_id = ? LIMIT 1 """, [log_id]): return (row["log_level"], row["file_name"], row["line_num"], row["original"]) return None
[docs] def check_original(self, original: str): """ Checks that an original log line has been added to the database. Mainly used for testing :param str original: :raises ValueError: If the original is not in the database """ with self._db: for row in self._db.execute( """ SELECT COUNT(log_id) as "counts" FROM log WHERE original = ? """, ([original])): if row["counts"] == 0: raise ValueError(f"{original} not found in database")
[docs] def get_max_log_id(self): """ Get the max id of any log message. :rtype: int """ with self._db: for row in self._db.execute( """ SELECT MAX(log_id) AS "max_id" FROM log """): return row["max_id"]