Source code for winevtrc.exporter

# -*- coding: utf-8 -*-
"""Windows Event Log message resource exporter."""

import abc
import difflib
import logging
import os

from acstore import sqlite_store

from winevtrc import resources
from winevtrc import storage  # pylint: disable=unused-import


[docs] class ExportEventLogProvider(object): """Event Log provider. Attributes: name (str): name. providers_with_versions (list[tuple[EventLogProvider, list[str]]]): Event Log providers with corresponding Windows versions. """
[docs] def __init__(self, name): """Initialized an Event Log provider. Args: name (str): name. """ super(ExportEventLogProvider, self).__init__() self.name = name self.providers_with_versions = []
[docs] class ExportMessageFile(object): """Windows Event Log message file. Attributes: message_tables (list[MessageTable]): message tables. name (str): name. windows_path (str): Windows path. """
[docs] def __init__(self, name): """Initializes the message file. Args: name (str): name. """ super(ExportMessageFile, self).__init__() self.message_tables = [] self.name = name self.windows_path = None
[docs] def GetMessageTables(self): """Retrieves the message tables. Yields: MessageTable: message table. """ yield from self.message_tables
[docs] class ExporterOutputWriter(object): """Exporter output writer."""
[docs] @abc.abstractmethod def Close(self): """Closes the output writer."""
[docs] @abc.abstractmethod def Open(self): """Opens the output writer. Returns: bool: True if successful or False if not. """
[docs] @abc.abstractmethod def WriteEventLogProvider(self, export_event_log_provider): """Writes an Event Log provider. Args: export_event_log_provider (ExportEventLogProvider): Event Log provider. """
[docs] @abc.abstractmethod def WriteMessageFile(self, message_file): """Writes a message file. Args: message_file (ExportMessageFile): message file. """
[docs] @abc.abstractmethod def WriteMessageFilesPerEventLogProvider( self, event_log_provider, message_file): """Writes a mapping between an Event Log provider and a message file. Args: event_log_provider (EventLogProvider): Event Log provider. message_file (ExportMessageFile): message file. """
[docs] class Exporter(object): """Exports the strings extracted from Windows EventLog message files.""" _EVENT_PROVIDERS_DATABASE_FILENAME = 'winevt-kb.db'
[docs] def __init__(self): """Initializes a Windows Event Log message resource exporter.""" super(Exporter, self).__init__() # TODO: refactor self._event_log_providers = {}
def _CompareEventLogProviders( self, event_log_provider, other_event_log_provider): """Compares 2 Event Log providers to determine if they are equivalent. Args: event_log_provider (EventLogProvider): Event Log provider. other_event_log_provider (EventLogProvider): other Event Log provider. Returns: bool: True if the Event Log providers are equivalent, False otherwise. """ if event_log_provider.identifier and ( event_log_provider.identifier != other_event_log_provider.identifier): return False category_message_files = { message_file.lower() for message_file in event_log_provider.category_message_files} message_files = { message_file.lower() for message_file in other_event_log_provider.category_message_files} if category_message_files and category_message_files != message_files: return False event_message_files = { message_file.lower() for message_file in event_log_provider.event_message_files} message_files = { message_file.lower() for message_file in other_event_log_provider.event_message_files} if event_message_files and event_message_files != message_files: return False parameter_message_files = { message_file.lower() for message_file in event_log_provider.parameter_message_files} message_files = { message_file.lower() for message_file in other_event_log_provider.parameter_message_files} if parameter_message_files and parameter_message_files != message_files: return False return True def _DiffMessageString( self, file_version, language_identifier, message_identifier, message_string, other_message_string): """Determines differences in 2 message strings. Args: file_version (str): file version. language_identifier (int): language identifier. message_identifier (int): message identifier. message_string (list[str]): message string. other_message_string (list[str]): other message string. """ color_green = '\x1b[0;32m' color_red = '\x1b[0;31m' color_end = '\x1b[0m' matcher = difflib.SequenceMatcher( None, message_string, other_message_string) opcodes = list(matcher.get_opcodes()) if opcodes: logging.warning(( f'Found duplicate alternating message string: ' f'0x{message_identifier:08x} in LCID: 0x{language_identifier:08x} ' f'and version: {file_version:s}.')) for opcode, first_start, first_end, second_start, second_end in opcodes: first_lines = message_string[first_start:first_end] second_lines = other_message_string[second_start:second_end] if opcode == 'insert': for line in second_lines: print(f'{color_green:s}+ {line:s}{color_end:s}') elif opcode == 'delete': for line in first_lines: print(f'{color_red:s}- {line:s}{color_end:s}') elif opcode == 'replace': for index in range(len(first_lines)): # pylint: disable=consider-using-enumerate first_line = first_lines[index] second_line = second_lines[index] first_output = [] second_output = [] line_matcher = difflib.SequenceMatcher( None, first_line, second_line, autojunk=False) for line_opcode, a0, a1, b0, b1 in line_matcher.get_opcodes(): if line_opcode == 'equal': text = first_line[a0:a1] first_output.append(text) second_output.append(text) elif opcode == 'insert': text = second_line[b0:b1] second_output.append(f'{color_green:s}{text:s}{color_end:s}') elif opcode == 'delete': text = first_line[a0:a1] first_output.append(f'{color_red:s}{text:s}{color_end:s}') elif opcode == 'replace': text = second_line[b0:b1] second_output.append(f'{color_green:s}{text:s}{color_end:s}') text = first_line[a0:a1] first_output.append(f'{color_red:s}{text:s}{color_end:s}') first_output = ''.join(first_output) second_output = ''.join(second_output) print(f'+ {second_output:s}') print(f'- {first_output:s}') def _ExportEventLogProviders(self, database_reader, output_writer): """Exports the Event Log providers from an event provider database. Args: database_reader (SQLiteAttributeContainerStore): event provider database reader. output_writer (ExporterOutputWriter): output writer. """ providers_per_log_source = {} for event_log_provider in database_reader.GetAttributeContainers( resources.EventLogProvider.CONTAINER_TYPE): name = event_log_provider.name if not name: log_sources = sorted(event_log_provider.log_sources) name = log_sources[0] is_equivalent = False export_event_log_provider = providers_per_log_source.get(name, None) if not export_event_log_provider: export_event_log_provider = ExportEventLogProvider(name) providers_per_log_source[name] = export_event_log_provider # TODO: refactor self._event_log_providers[name] = event_log_provider else: for existing_event_log_provider, windows_versions in ( export_event_log_provider.providers_with_versions): is_equivalent = self._CompareEventLogProviders( event_log_provider, existing_event_log_provider) if is_equivalent: windows_versions.append(event_log_provider.windows_version) break if not is_equivalent: export_event_log_provider.providers_with_versions.append(( event_log_provider, [event_log_provider.windows_version])) for export_event_log_provider in sorted( providers_per_log_source.values(), key=lambda export_event_log_provider: export_event_log_provider.name): output_writer.WriteEventLogProvider(export_event_log_provider) def _ExportMessageFile(self, message_file, message_file_database_path): """Exports a message file. Args: message_file (ExportMessageFile): message file. message_file_database_path (str): path of the message file database. """ database_reader = sqlite_store.SQLiteAttributeContainerStore() database_reader.Open(path=message_file_database_path, read_only=True) self._ExportMessageStrings(message_file, database_reader) database_reader.Close() def _ExportMessageFiles(self, source_path, database_reader, output_writer): """Exports the message files from an event provider database. Args: source_path (str): source path. database_reader (SQLiteAttributeContainerStore): event provider database reader. output_writer (ExporterOutputWriter): output writer. """ for descriptor in database_reader.GetAttributeContainers( resources.MessageFileDatabaseDescriptor.CONTAINER_TYPE): message_file_database_path = os.path.join( source_path, descriptor.database_filename) if not os.path.exists(message_file_database_path): logging.warning( f'Missing message file database: {descriptor.database_filename:s}.') continue logging.info(f'Processing: {descriptor.database_filename:s}') # Strip ".db" from the database filename. name = descriptor.database_filename[:-3] message_file = ExportMessageFile(name) message_file.windows_path = descriptor.message_filename self._ExportMessageFile(message_file, message_file_database_path) output_writer.WriteMessageFile(message_file) def _ExportMessageFilesPerEventLogProvider(self, output_writer): """Exports the message files used by an Event Log provider. Args: output_writer (ExporterOutputWriter): output writer. """ # TODO: refactor for event_log_provider in self._event_log_providers.values(): for message_filename in event_log_provider.event_message_files: output_writer.WriteMessageFilesPerEventLogProvider( event_log_provider, message_filename) def _ExportMessageStrings(self, message_file, database_reader): """Exports the message strings from a message file database. Args: message_file (ExportMessageFile): message file. database_reader (SQLiteAttributeContainerStore): message file database reader. """ extracted_message_tables_per_file_versions = {} for extracted_message_table in database_reader.GetAttributeContainers( resources.MessageTableDescriptor.CONTAINER_TYPE): extracted_message_file_identifier = ( extracted_message_table.GetMessageFileIdentifier()) extracted_message_file = ( database_reader.GetAttributeContainerByIdentifier( resources.MessageFileDescriptor.CONTAINER_TYPE, extracted_message_file_identifier)) # pylint: disable=consider-using-generator file_version_tuple = tuple([ int(number, 10) for number in extracted_message_file.file_version.split('.')]) extracted_message_tables_per_file_versions[file_version_tuple] = ( extracted_message_table) message_tables = [] for file_version_tuple, extracted_message_table in sorted( extracted_message_tables_per_file_versions.items()): file_version = '.'.join([f'{number:d}' for number in file_version_tuple]) message_table = resources.MessageTable( extracted_message_table.language_identifier) message_table.file_versions.append(file_version) table_identifier = extracted_message_table.GetIdentifier() identifier_string = table_identifier.CopyToString() filter_expression = f'_message_table_identifier=="{identifier_string:s}"' for extracted_message_string in database_reader.GetAttributeContainers( resources.MessageStringDescriptor.CONTAINER_TYPE, filter_expression=filter_expression): message_table.message_strings[extracted_message_string.identifier] = ( extracted_message_string.text) message_tables.append((file_version, message_table)) file_version, message_table = message_tables[0] language_identifier = message_table.language_identifier # TODO: handle multiple language identifiers. message_file.message_tables = [message_table] for other_file_version, other_message_table in message_tables[1:]: if message_table.message_strings == other_message_table.message_strings: message_table.file_versions.append(other_file_version) else: message_identifiers = set(message_table.message_strings.keys()) message_identifiers.update(set( other_message_table.message_strings.keys())) for message_identifier in message_identifiers: message_string = message_table.message_strings.get( message_identifier, None) other_message_string = other_message_table.message_strings.get( message_identifier, None) if not message_string: logging.warning(( f'Message string: 0x{message_identifier:08x} was added in ' f'LCID: 0x{language_identifier:08x} and version: ' f'{other_file_version:s}.')) elif not other_message_string: logging.warning(( f'Message string: 0x{message_identifier:08x} was removed in ' f'LCID: 0x{language_identifier:08x} and version: ' f'{other_file_version:s}.')) elif message_string != other_message_string: self._DiffMessageString( file_version, message_table.language_identifier, message_identifier, [message_string], [other_message_string]) file_version = other_file_version message_table = other_message_table message_file.message_tables.append(message_table)
[docs] def Export(self, source_path, output_writer): """Exports the strings extracted from message files. Args: source_path (str): source path. output_writer (ExporterOutputWriter): output writer. """ event_providers_database_path = os.path.join( source_path, self._EVENT_PROVIDERS_DATABASE_FILENAME) database_reader = sqlite_store.SQLiteAttributeContainerStore() database_reader.Open(path=event_providers_database_path, read_only=True) try: self._ExportEventLogProviders(database_reader, output_writer) self._ExportMessageFiles(source_path, database_reader, output_writer) finally: database_reader.Close() self._ExportMessageFilesPerEventLogProvider(output_writer)