# -*- coding: utf-8 -*-
"""Read from and write to SQLite databases."""
import logging
import re
import sqlite3
from winevtrc import definitions
from winevtrc import errors
from winevtrc import resources
[docs]
class SQLite3DatabaseFile(object):
"""A SQLite database file."""
_HAS_TABLE_QUERY = (
'SELECT name FROM sqlite_master '
'WHERE type = "table" AND name = "{0:s}"')
[docs]
def __init__(self):
"""Initializes the database file."""
super(SQLite3DatabaseFile, self).__init__()
self._connection = None
self._cursor = None
self.filename = None
self.read_only = None
def _GetValues(self, cursor, table_names, column_names, condition):
"""Values generator function.
Args:
cursor (sqlite3.Cursor): SQLite database cursor.
table_names (list[str]): table names.
column_names (list[str]): column names.
condition (str): condition.
Yields:
dict[str, object]: value.
Raises:
BackendError: if the database back-end raises an exception.
"""
table_names = ', '.join(table_names)
column_names_string = ', '.join(column_names)
sql_query = f'SELECT {column_names_string:s} FROM {table_names:s}'
if condition:
sql_query = ''.join([sql_query, f' WHERE {condition:s}'])
try:
cursor.execute(sql_query)
except sqlite3.OperationalError as exception:
raise errors.BackendError(exception)
for row in cursor:
values = {}
for column_index, column_name in enumerate(column_names):
values[column_name] = row[column_index]
yield values
[docs]
def Close(self):
"""Closes the database file.
Raises:
IOError: if the database is not opened.
OSError: if the database is not opened.
"""
if not self._connection:
raise IOError('Cannot close database not opened.')
# We need to run commit or not all data is stored in the database.
self._connection.commit()
self._connection.close()
self._connection = None
self._cursor = None
self.filename = None
self.read_only = None
[docs]
def CreateTable(self, table_name, column_definitions):
"""Creates a table.
Args:
table_name (str): table name.
column_definitions (list[str]): column definitions.
Raises:
BackendError: if the database back-end raises an exception.
IOError: if the database is not opened or
if the database is in read-only mode.
OSError: if the database is not opened or
if the database is in read-only mode.
"""
if not self._connection:
raise IOError('Cannot create table database not opened.')
if self.read_only:
raise IOError('Cannot create table database in read-only mode.')
column_definitions = ', '.join(column_definitions)
try:
self._cursor.execute(
f'CREATE TABLE {table_name:s} ( {column_definitions:s} )')
except sqlite3.OperationalError as exception:
raise errors.BackendError(exception)
[docs]
def GetValues(self, table_names, column_names, condition):
"""Retrieves values from a table.
Args:
table_names (list[str]): table names.
column_names (list[str]): column names.
condition (str): condition.
Returns:
generator: values generator.
Raises:
IOError: if the database is not opened.
OSError: if the database is not opened.
"""
if not self._connection:
raise IOError('Cannot retrieve values database not opened.')
cursor = self._connection.cursor()
return self._GetValues(cursor, table_names, column_names, condition)
[docs]
def HasTable(self, table_name):
"""Determines if a specific table exists.
Args:
table_name (str): table name.
Returns:
bool: True if the table exists, false otherwise.
Raises:
BackendError: if the database back-end raises an exception.
IOError: if the database is not opened.
OSError: if the database is not opened.
"""
if not self._connection:
raise IOError('Cannot determine if table exists database not opened.')
sql_query = self._HAS_TABLE_QUERY.format(table_name)
try:
self._cursor.execute(sql_query)
has_table = bool(self._cursor.fetchone())
except sqlite3.OperationalError as exception:
raise errors.BackendError(exception)
return has_table
[docs]
def InsertValues(self, table_name, column_names, values):
"""Inserts values into a table.
Args:
table_name (str): table name.
column_names (list[str]): column names.
values (list[str]): values formatted as a string.
Raises:
BackendError: if the database back-end raises an exception.
IOError: if the database is not opened or
if the database is in read-only mode or
if an unsupported value type is encountered.
OSError: if the database is not opened or
if the database is in read-only mode or
if an unsupported value type is encountered.
"""
if not self._connection:
raise IOError('Cannot insert values database not opened.')
if self.read_only:
raise IOError('Cannot insert values database in read-only mode.')
if not values:
return
sql_values = []
for value in values:
# TODO: handle bool.
if isinstance(value, str):
# In sqlite3 the double quote is escaped with a second double quote.
value = re.sub('"', '""', value)
value = f'"{value:s}"'
elif isinstance(value, int):
value = f'{value:d}'
elif isinstance(value, float):
value = f'{value:f}'
elif value is None:
value = 'NULL'
else:
value_type = type(value)
raise IOError(f'Unsupported value type: {value_type!s}.')
sql_values.append(value)
column_names = ', '.join(column_names)
sql_values = ', '.join(sql_values)
try:
self._cursor.execute(
f'INSERT INTO {table_name:s} ( {column_names:s} ) '
f'VALUES ( {sql_values:s} )')
except sqlite3.OperationalError as exception:
raise errors.BackendError(exception)
[docs]
def Open(self, filename, read_only=False):
"""Opens the database file.
Args:
filename (str): filename of the database.
read_only (Optional[bool]): True if the database should be opened in
read-only mode. Since sqlite3 does not support a real read-only
mode we fake it by only permitting SELECT queries.
Returns:
bool: True if successful or False if not.
Raises:
BackendError: if the database back-end raises an exception.
IOError: if the database is already opened.
OSError: if the database is already opened.
"""
if self._connection:
raise IOError('Cannot open database already opened.')
self.filename = filename
self.read_only = read_only
self._connection = sqlite3.connect(filename)
if not self._connection:
return False
try:
self._cursor = self._connection.cursor()
except sqlite3.OperationalError as exception:
raise errors.BackendError(exception)
if not self._cursor:
return False
return True
[docs]
class ResourcesSQLite3DatabaseReader(object):
"""Event Log resources SQLite database reader."""
[docs]
def __init__(self):
"""Initializes the database reader."""
super(ResourcesSQLite3DatabaseReader, self).__init__()
self._database_file = SQLite3DatabaseFile()
def _GetEventLogProviderKey(self, log_source):
"""Retrieves the Event Log provider key.
Args:
log_source (str): Event Log source.
Returns:
int: an Event Log provider key or None if not available.
Raises:
IOError: if more than one value is found in the database.
OSError: if more than one value is found in the database.
"""
table_names = ['event_log_providers']
column_names = ['event_log_provider_key']
condition = f'log_source == "{log_source:s}"'
values_list = list(self._database_file.GetValues(
table_names, column_names, condition))
number_of_values = len(values_list)
if number_of_values == 0:
return None
if number_of_values == 1:
values = values_list[0]
return values['event_log_provider_key']
raise IOError('More than one value found in database.')
def _GetMessage(self, message_file_key, lcid, message_identifier):
"""Retrieves a specific message from a specific message table.
Args:
message_file_key (int): message file key.
lcid (int): language code identifier (LCID).
message_identifier (int): message identifier.
Returns:
str: the message string or None if not available.
Raises:
IOError: if more than one value is found in the database.
OSError: if more than one value is found in the database.
"""
table_name = f'message_table_{message_file_key:d}_0x{lcid:08x}'
has_table = self._database_file.HasTable(table_name)
if not has_table:
return None
column_names = ['message_string']
condition = f'message_identifier == "0x{message_identifier:08x}"'
values = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values)
if number_of_values == 0:
return None
if number_of_values == 1:
return values[0]['message_string']
raise IOError('More than one value found in database.')
def _GetMessageFileKeys(self, event_log_provider_key):
"""Retrieves the message file keys.
Args:
event_log_provider_key (int): the Event Log provider key.
Yields:
int: a message file key.
"""
table_names = ['message_file_per_event_log_provider']
column_names = ['message_file_key']
condition = f'event_log_provider_key == {event_log_provider_key:d}'
generator = self._database_file.GetValues(
table_names, column_names, condition)
# pylint: disable=not-an-iterable
for values in generator:
yield values['message_file_key']
def _GetMessageFilenames(self, log_source, message_file_type):
"""Retrieves the message filenames of a specific Event Log provider.
Args:
log_source (str): Event Log source.
message_file_type (str): message file type.
Returns:
list[str]: message filenames.
"""
table_names = [
'event_log_providers', 'message_file_per_event_log_provider',
'message_files']
column_names = ['message_files.path']
condition = ' AND '.join([
f'event_log_providers.log_source == "{log_source:s}"',
(f'message_file_per_event_log_provider.message_file_type == '
f'"{message_file_type:s}"'),
('event_log_providers.event_log_provider_key == '
'message_file_per_event_log_provider.event_log_provider_key'),
('message_file_per_event_log_provider.message_file_key == '
'message_files.message_file_key')])
message_filenames = []
for values in self._database_file.GetValues(
table_names, column_names, condition):
message_filename = values['message_files.path']
message_filenames.append(message_filename)
return message_filenames
def _GetMessages(self, message_file_key, lcid):
"""Retrieves the messages of a specific message table.
Args:
message_file_key (int): message file key.
lcid (int): language code identifier (LCID).
Yields:
tuple[int, str]: message identifier and message string.
"""
table_name = f'message_table_{message_file_key:d}_0x{lcid:08x}'
has_table = self._database_file.HasTable(table_name)
if has_table:
column_names = ['message_identifier', 'message_string']
condition = ''
for values in self._database_file.GetValues(
[table_name], column_names, condition):
yield values['message_identifier'], values['message_string']
[docs]
def Close(self):
"""Closes the database reader."""
self._database_file.Close()
[docs]
def GetEventLogProviders(self):
"""Retrieves the Event Log providers.
Yields:
EventLogProvider: an Event Log provider.
"""
table_names = ['event_log_providers']
column_names = ['log_source', 'provider_guid']
condition = ''
event_log_providers = []
for values in self._database_file.GetValues(
table_names, column_names, condition):
event_log_provider = resources.EventLogProvider()
event_log_provider.identifier = values['provider_guid']
event_log_provider.log_sources.append(values['log_source'])
event_log_providers.append(event_log_provider)
for event_log_provider in event_log_providers:
message_filenames = self._GetMessageFilenames(
event_log_provider.log_source,
definitions.MESSAGE_FILE_TYPE_CATEGORY)
event_log_provider.SetCategoryMessageFilenames(message_filenames)
message_filenames = self._GetMessageFilenames(
event_log_provider.log_source, definitions.MESSAGE_FILE_TYPE_EVENT)
event_log_provider.SetEventMessageFilenames(message_filenames)
message_filenames = self._GetMessageFilenames(
event_log_provider.log_source,
definitions.MESSAGE_FILE_TYPE_PARAMETER)
event_log_provider.SetParameterMessageFilenames(message_filenames)
yield event_log_provider
[docs]
def GetMessage(self, log_source, lcid, message_identifier):
"""Retrieves a specific message for a specific Event Log source.
Args:
log_source (str): Event Log source.
lcid (int): language code identifier (LCID).
message_identifier (int): message identifier.
Returns:
str: the message string or None if not available.
"""
event_log_provider_key = self._GetEventLogProviderKey(log_source)
if not event_log_provider_key:
return None
generator = self._GetMessageFileKeys(event_log_provider_key)
if not generator:
return None
message_string = None
for message_file_key in generator:
message_string = self._GetMessage(
message_file_key, lcid, message_identifier)
if message_string:
break
return message_string
[docs]
def GetMessages(self, log_source, lcid):
"""Retrieves the messages of a specific Event Log source.
Args:
log_source (str): Event Log source.
lcid (int): language code identifier (LCID).
Yields:
tuple[int, str]: message identifier and message string.
"""
event_log_provider_key = self._GetEventLogProviderKey(log_source)
if event_log_provider_key:
for message_file_key in self._GetMessageFileKeys(event_log_provider_key):
for message_identifier, message_string in self._GetMessages(
message_file_key, lcid):
yield message_identifier, message_string
[docs]
def Open(self, filename):
"""Opens the database reader.
Args:
filename (str): filename of the database.
Returns:
bool: True if successful or False if not.
"""
return self._database_file.Open(filename, read_only=True)
[docs]
class ResourcesSQLite3DatabaseWriter(object):
"""Event Log resources SQLite database writer."""
# Message string specifiers that are considered white space.
_WHITE_SPACE_SPECIFIER_RE = re.compile(r'(%[0b]|[\r\n])')
# Message string specifiers that expand to text.
_TEXT_SPECIFIER_RE = re.compile(r'%([ .!%nrt])')
# Curly brackets in a message string.
_CURLY_BRACKETS = re.compile(r'([\{\}])')
# Message string specifiers that expand to a variable place holder.
_PLACE_HOLDER_SPECIFIER_RE = re.compile(r'%([1-9][0-9]?)[!]?[s]?[!]?')
[docs]
def __init__(self, string_format='wrc'):
"""Initializes the database writer.
Args:
string_format (Optional[str]): string format. The default is the Windows
Resource (wrc) format.
"""
super(ResourcesSQLite3DatabaseWriter, self).__init__()
self._database_file = SQLite3DatabaseFile()
self._string_format = string_format
def _GetEventLogProviderKey(self, event_log_provider):
"""Retrieves the key of an Event Log provider.
Args:
event_log_provider (EventLogProvider): Event Log provider.
Returns:
int: the Event Log provider key or None if no such value.
Raises:
IOError: if more than one value is found in the database.
OSError: if more than one value is found in the database.
"""
table_names = ['event_log_providers']
column_names = ['event_log_provider_key']
condition = f'log_source = "{event_log_provider.log_source:s}"'
values_list = list(self._database_file.GetValues(
table_names, column_names, condition))
number_of_values = len(values_list)
if number_of_values == 0:
return None
if number_of_values == 1:
values = values_list[0]
return values['event_log_provider_key']
raise IOError('More than one value found in database.')
def _GetMessageFileKey(self, message_file):
"""Retrieves the key of a message file.
Args:
message_file (ExportMessageFile): message file.
Returns:
int: the message file key or None if no such value.
Raises:
IOError: if more than one value is found in the database.
OSError: if more than one value is found in the database.
"""
table_names = ['message_files']
column_names = ['message_file_key']
condition = f'LOWER(path) = LOWER("{message_file.windows_path:s}")'
values_list = list(self._database_file.GetValues(
table_names, column_names, condition))
number_of_values = len(values_list)
if number_of_values == 0:
return None
if number_of_values == 1:
values = values_list[0]
return values['message_file_key']
raise IOError('More than one value found in database.')
def _GetMessageFileKeyByPath(self, message_filename):
"""Retrieves the key of a message file for a specific path.
Args:
message_filename (str): message filename.
Returns:
int: the message file key or None if no such value.
Raises:
IOError: if more than one value is found in the database.
OSError: if more than one value is found in the database.
"""
table_names = ['message_files']
column_names = ['message_file_key']
condition = f'LOWER(path) = LOWER("{message_filename:s}")'
values_list = list(self._database_file.GetValues(
table_names, column_names, condition))
number_of_values = len(values_list)
if number_of_values == 0:
return None
if number_of_values == 1:
values = values_list[0]
return values['message_file_key']
raise IOError('More than one value found in database.')
def _ReformatMessageString(self, message_string):
"""Reformats the message string.
Args:
message_string (str): message string.
Returns:
str: message string in Python format() (PEP 3103) style or None
if not available.
"""
def PlaceHolderSpecifierReplacer(match_object):
"""Replaces message string place holders into Python format() style."""
expanded_groups = []
for group in match_object.groups():
try:
place_holder_number = int(group, 10) - 1
expanded_group = f'{{{place_holder_number:d}:s}}'
except ValueError:
expanded_group = group
expanded_groups.append(expanded_group)
return ''.join(expanded_groups)
if not message_string:
return None
message_string = self._WHITE_SPACE_SPECIFIER_RE.sub(r'', message_string)
message_string = self._TEXT_SPECIFIER_RE.sub(r'\\\1', message_string)
message_string = self._CURLY_BRACKETS.sub(r'\1\1', message_string)
return self._PLACE_HOLDER_SPECIFIER_RE.sub(
PlaceHolderSpecifierReplacer, message_string)
def _WriteMessage(
self, message_file, language_identifier, message_identifier,
message_string, table_name, has_table):
"""Writes a message to a specific message table.
Args:
message_file (ExportMessageFile): message file.
language_identifier (int): language identifier (LCID).
message_identifier (int): message identifier.
message_string (str): message string.
table_name (str): name of the table.
has_table (bool): True if the table previously existed in the database.
"""
column_names = ['message_identifier', 'message_string']
if not has_table:
insert_values = True
else:
condition = f'message_identifier = "{message_identifier:s}"'
values_list = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values_list)
if number_of_values == 1:
values = values_list[0]
stored_message_string = values['message_string']
if message_string != stored_message_string:
logging.warning((
f'Message string mismatch for LCID: {language_identifier:s}, '
f'file version: {message_file.file_version:s}, message '
f'identifier: {message_identifier:s}.\nFound: '
f'{message_string:s}\nStored: {stored_message_string:s}\n'))
elif number_of_values != 0:
logging.warning((
f'More than one message string found for LCID: '
f'{language_identifier:s}, file version: '
f'{message_file.file_version:s}, message identifier: '
f'{message_identifier:s}.'))
# TODO: warn if new message has been found.
insert_values = False
if insert_values:
if self._string_format == 'pep3101':
message_string = self._ReformatMessageString(message_string)
values = [message_identifier, message_string]
self._database_file.InsertValues(table_name, column_names, values)
def _WriteMessageTableLanguage(self, message_file_key, language_identifier):
"""Writes a message table language.
Args:
message_file_key (int): message file key.
language_identifier (int): language identifier (LCID).
"""
table_name = 'message_table_languages'
column_names = ['lcid', 'message_file_key']
has_table = self._database_file.HasTable(table_name)
if not has_table:
column_definitions = ['lcid TEXT', 'message_file_key INT']
self._database_file.CreateTable(table_name, column_definitions)
if not has_table:
insert_values = True
else:
condition = (
f'lcid = "{language_identifier:s}" AND '
f'message_file_key = "{message_file_key:d}"')
values_list = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values_list)
insert_values = number_of_values == 0
if insert_values:
values = [language_identifier, message_file_key]
self._database_file.InsertValues(table_name, column_names, values)
[docs]
def Close(self):
"""Closes the database writer."""
self._database_file.Close()
[docs]
def Open(self, filename):
"""Opens the database writer.
Args:
filename (str): filename of the database.
Returns:
bool: True if successful or False if not.
"""
return self._database_file.Open(filename)
[docs]
def WriteEventLogProvider(self, event_log_provider):
"""Writes the Event Log provider.
Args:
event_log_provider (EventLogProvider): Event Log provider.
"""
table_name = 'event_log_providers'
column_names = ['log_source', 'provider_guid']
has_table = self._database_file.HasTable(table_name)
if not has_table:
column_definitions = [
'event_log_provider_key INTEGER PRIMARY KEY AUTOINCREMENT',
'log_source TEXT', 'provider_guid TEXT']
self._database_file.CreateTable(table_name, column_definitions)
insert_values = True
else:
if event_log_provider.identifier:
condition = f'provider_guid = "{event_log_provider.identifier:s}"'
elif event_log_provider.log_source:
condition = f'log_source = "{event_log_provider.log_source:s}"'
values_list = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values_list)
# TODO: check if more than 1 result.
insert_values = number_of_values == 0
if insert_values:
values = [
event_log_provider.log_source, event_log_provider.identifier]
self._database_file.InsertValues(table_name, column_names, values)
[docs]
def WriteMessageFile(self, message_file):
"""Writes a message file.
Args:
message_file (ExportMessageFile): message file.
"""
table_name = 'message_files'
column_names = ['path']
has_table = self._database_file.HasTable(table_name)
if not has_table:
column_definitions = [
'message_file_key INTEGER PRIMARY KEY AUTOINCREMENT', 'path TEXT']
self._database_file.CreateTable(table_name, column_definitions)
if not has_table:
insert_values = True
else:
condition = f'LOWER(path) = LOWER("{message_file.windows_path:s}")'
values_list = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values_list)
insert_values = number_of_values == 0
if insert_values:
values = [message_file.windows_path]
self._database_file.InsertValues(table_name, column_names, values)
[docs]
def WriteMessageFilesPerEventLogProvider(
self, event_log_provider, message_filename, message_file_type):
"""Writes the message files used by an Event Log provider.
Args:
event_log_provider (EventLogProvider): Event Log provider.
message_filename (str): message filename.
message_file_type (str): message file type.
"""
table_name = 'message_file_per_event_log_provider'
column_names = [
'message_file_key', 'message_file_type', 'event_log_provider_key']
event_log_provider_key = self._GetEventLogProviderKey(event_log_provider)
if event_log_provider_key is None:
logging.warning((
f'Missing Event Log provider key for: '
f'{event_log_provider.log_source:s}'))
message_file_key = self._GetMessageFileKeyByPath(message_filename)
if message_file_key is None:
logging.warning(f'Missing message file key for: {message_filename:s}')
return
has_table = self._database_file.HasTable(table_name)
if not has_table:
column_definitions = [
'message_file_key INTEGER', 'message_file_type TEXT',
'event_log_provider_key INTEGER']
self._database_file.CreateTable(table_name, column_definitions)
insert_values = True
else:
condition = (
f'message_file_key = {message_file_key:d} AND '
f'message_file_type = "{message_file_type:s}" AND '
f'event_log_provider_key = {event_log_provider_key:d}')
values_list = list(self._database_file.GetValues(
[table_name], column_names, condition))
number_of_values = len(values_list)
# TODO: check if more than 1 result.
insert_values = number_of_values == 0
if insert_values:
values = [message_file_key, message_file_type, event_log_provider_key]
self._database_file.InsertValues(table_name, column_names, values)
[docs]
def WriteMessageTable(self, message_file, message_table):
"""Writes a message table for a specific language identifier.
Args:
message_file (ExportMessageFile): message file.
message_table (MessageTable): message table.
"""
if message_table.message_strings:
message_file_key = self._GetMessageFileKey(message_file)
if message_file_key is None:
logging.warning(
f'Missing message file key for: {message_file.windows_path:s}')
table_name = f'message_table_{message_file_key:d}_{message_table.lcid:s}'
has_table = self._database_file.HasTable(table_name)
if not has_table:
column_definitions = [
'message_identifier TEXT', 'message_string TEXT']
self._database_file.CreateTable(table_name, column_definitions)
message_strings = message_table.message_strings
for message_identifier, message_string in message_strings.items():
self._WriteMessage(
message_file, message_table.lcid, f'0x{message_identifier:08x}',
message_string, table_name, has_table)
self._WriteMessageTableLanguage(message_file_key, message_table.lcid)