Source code for winevtrc.exporter

"""Windows Event Log message resource exporter."""

import abc
import difflib
import logging
import os

from acstore import sqlite_store

from winevtrc import resources
from winevtrc import storage  # pylint: disable=unused-import


[docs] class ExportEventLogProvider: """Event Log provider. Attributes: name (str): name. providers_with_versions (list[tuple[EventLogProvider, list[str]]]): Event Log providers with corresponding Windows versions. """
[docs] def __init__(self, name): """Initialized an Event Log provider. Args: name (str): name. """ super().__init__() self.name = name self.providers_with_versions = []
[docs] class MessageFileAttributeContainerStore(sqlite_store.SQLiteAttributeContainerStore): """Message file attribute container store. Attributes: format_version (int): storage format version. name (str): name of the the message database. serialization_format (str): serialization format. windows_path (str): Windows path. """
[docs] def __init__(self, attribute_container): """Initializes a message file attribute container store. Args: attribute_container (WinevtResourcesMessageFileDatabase): message file database attribute container.. """ super().__init__() # Strip ".db" from the database filename. self.name = attribute_container.database_filename[:-3] self.windows_path = attribute_container.windows_path
[docs] class ExporterOutputWriter: """Exporter output writer.""" def _DiffMessageString( self, file_version, language_identifier, message_identifier, message_string, other_message_string, ): """Determines differences in 2 message strings. Args: file_version (str): file version. language_identifier (int): language identifier. message_identifier (int): message identifier. message_string (list[str]): message string. other_message_string (list[str]): other message string. """ color_green = "\x1b[0;32m" color_red = "\x1b[0;31m" color_end = "\x1b[0m" matcher = difflib.SequenceMatcher(None, message_string, other_message_string) opcodes = list(matcher.get_opcodes()) if opcodes: logging.warning( f"Found duplicate alternating message string: " f"0x{message_identifier:08x} in LCID: 0x{language_identifier:08x} " f"and version: {file_version:s}." ) for opcode, first_start, first_end, second_start, second_end in opcodes: first_lines = message_string[first_start:first_end] second_lines = other_message_string[second_start:second_end] if opcode == "insert": for line in second_lines: print(f"{color_green:s}+ {line:s}{color_end:s}") elif opcode == "delete": for line in first_lines: print(f"{color_red:s}- {line:s}{color_end:s}") elif opcode == "replace": # pylint: disable=consider-using-enumerate for index in range(len(first_lines)): first_line = first_lines[index] second_line = second_lines[index] first_output = [] second_output = [] line_matcher = difflib.SequenceMatcher( None, first_line, second_line, autojunk=False ) for line_opcode, a0, a1, b0, b1 in line_matcher.get_opcodes(): if line_opcode == "equal": text = first_line[a0:a1] first_output.append(text) second_output.append(text) elif opcode == "insert": text = second_line[b0:b1] second_output.append( f"{color_green:s}{text:s}{color_end:s}" ) elif opcode == "delete": text = first_line[a0:a1] first_output.append(f"{color_red:s}{text:s}{color_end:s}") elif opcode == "replace": text = second_line[b0:b1] second_output.append( f"{color_green:s}{text:s}{color_end:s}" ) text = first_line[a0:a1] first_output.append(f"{color_red:s}{text:s}{color_end:s}") first_output = "".join(first_output) second_output = "".join(second_output) print(f"+ {second_output:s}") print(f"- {first_output:s}")
[docs] @abc.abstractmethod def Close(self): """Closes the output writer."""
[docs] def GetMessageTables(self, message_file): """Retrieves message tables. Args: message_file (MessageFileAttributeContainerStore): message file. Yields: MessageTable: message table. """ extracted_message_tables_per_file_versions = {} for extracted_message_table in message_file.GetAttributeContainers( resources.MessageTableDescriptor.CONTAINER_TYPE ): extracted_message_file_identifier = ( extracted_message_table.GetMessageFileIdentifier() ) extracted_message_file = message_file.GetAttributeContainerByIdentifier( resources.WinevtResourcesMessageFile.CONTAINER_TYPE, extracted_message_file_identifier, ) # pylint: disable=consider-using-generator file_version_tuple = tuple( [ int(number, 10) for number in extracted_message_file.file_version.split(".") ] ) extracted_message_tables_per_file_versions[file_version_tuple] = ( extracted_message_table ) message_tables = [] for file_version_tuple, extracted_message_table in sorted( extracted_message_tables_per_file_versions.items() ): file_version = ".".join([f"{number:d}" for number in file_version_tuple]) message_table = resources.MessageTable( extracted_message_table.language_identifier ) message_table.file_versions.append(file_version) table_identifier = extracted_message_table.GetIdentifier() identifier_string = table_identifier.CopyToString() filter_expression = f'_message_table_identifier=="{identifier_string:s}"' for extracted_message_string in message_file.GetAttributeContainers( resources.WinevtResourcesMessageString.CONTAINER_TYPE, filter_expression=filter_expression, ): message_table.message_strings[ extracted_message_string.message_identifier ] = extracted_message_string.text message_tables.append((file_version, message_table)) file_version, message_table = message_tables[0] language_identifier = message_table.language_identifier # TODO: handle multiple language identifiers. export_message_tables = [message_table] for other_file_version, other_message_table in message_tables[1:]: if message_table.message_strings == other_message_table.message_strings: message_table.file_versions.append(other_file_version) else: message_identifiers = set(message_table.message_strings.keys()) message_identifiers.update( set(other_message_table.message_strings.keys()) ) for message_identifier in message_identifiers: message_string = message_table.message_strings.get( message_identifier, None ) other_message_string = other_message_table.message_strings.get( message_identifier, None ) if not message_string: logging.warning( f"Message string: 0x{message_identifier:08x} was added in " f"LCID: 0x{language_identifier:08x} and version: " f"{other_file_version:s}." ) elif not other_message_string: logging.warning( f"Message string: 0x{message_identifier:08x} was removed " f"in LCID: 0x{language_identifier:08x} and version: " f"{other_file_version:s}." ) elif message_string != other_message_string: self._DiffMessageString( file_version, message_table.language_identifier, message_identifier, [message_string], [other_message_string], ) file_version = other_file_version message_table = other_message_table export_message_tables.append(message_table) yield from export_message_tables
[docs] @abc.abstractmethod def Open(self): """Opens the output writer. Returns: bool: True if successful or False if not. """
[docs] @abc.abstractmethod def WriteEventLogProvider(self, export_event_log_provider): """Writes an Event Log provider. Args: export_event_log_provider (ExportEventLogProvider): Event Log provider. """
[docs] @abc.abstractmethod def WriteMessageFile(self, message_file): """Writes a message file. Args: message_file (MessageFileAttributeContainerStore): message file. """
[docs] class Exporter: """Exports the strings extracted from Windows EventLog message files.""" _EVENT_PROVIDERS_DATABASE_FILENAME = "winevt-kb.db" def _CompareEventLogProviders(self, event_log_provider, other_event_log_provider): """Compares 2 Event Log providers to determine if they are equivalent. Args: event_log_provider (EventLogProvider): Event Log provider. other_event_log_provider (EventLogProvider): other Event Log provider. Returns: bool: True if the Event Log providers are equivalent, False otherwise. """ if event_log_provider.identifier and ( event_log_provider.identifier != other_event_log_provider.identifier ): return False category_message_files = { message_file.lower() for message_file in event_log_provider.category_message_files } message_files = { message_file.lower() for message_file in other_event_log_provider.category_message_files } if category_message_files and category_message_files != message_files: return False event_message_files = { message_file.lower() for message_file in event_log_provider.event_message_files } message_files = { message_file.lower() for message_file in other_event_log_provider.event_message_files } if event_message_files and event_message_files != message_files: return False parameter_message_files = { message_file.lower() for message_file in event_log_provider.parameter_message_files } message_files = { message_file.lower() for message_file in other_event_log_provider.parameter_message_files } if parameter_message_files and parameter_message_files != message_files: return False return True def _ExportEventLogProviders(self, database_reader, output_writer): """Exports the Event Log providers from an Event Log provider database. Args: database_reader (SQLiteAttributeContainerStore): Event Log provider attribute container store. output_writer (ExporterOutputWriter): output writer. """ providers_per_log_source = {} for event_log_provider in database_reader.GetAttributeContainers( resources.WinevtResourcesEventLogProvider.CONTAINER_TYPE ): name = event_log_provider.name if not name: log_sources = sorted(event_log_provider.log_sources) name = log_sources[0] is_equivalent = False export_event_log_provider = providers_per_log_source.get(name) if not export_event_log_provider: export_event_log_provider = ExportEventLogProvider(name) providers_per_log_source[name] = export_event_log_provider else: for ( existing_event_log_provider, windows_versions, ) in export_event_log_provider.providers_with_versions: is_equivalent = self._CompareEventLogProviders( event_log_provider, existing_event_log_provider ) if is_equivalent: windows_versions.append(event_log_provider.windows_version) break if not is_equivalent: export_event_log_provider.providers_with_versions.append( (event_log_provider, [event_log_provider.windows_version]) ) for export_event_log_provider in sorted( providers_per_log_source.values(), key=lambda export_event_log_provider: export_event_log_provider.name, ): output_writer.WriteEventLogProvider(export_event_log_provider) def _ExportMessageResourceFiles(self, source_path, database_reader, output_writer): """Exports the message files from an Event Log provider database. Args: source_path (str): source path. database_reader (SQLiteAttributeContainerStore): Event Log provider attribute container store. output_writer (ExporterOutputWriter): output writer. """ for attribute_container in database_reader.GetAttributeContainers( resources.WinevtResourcesMessageFileDatabase.CONTAINER_TYPE ): message_file_database_path = os.path.join( source_path, attribute_container.database_filename ) if not os.path.exists(message_file_database_path): logging.warning( ( f"Missing message file database: " f"{attribute_container.database_filename:s}." ) ) continue logging.info(f"Processing: {attribute_container.database_filename:s}") message_file = MessageFileAttributeContainerStore(attribute_container) message_file.Open(message_file_database_path) try: output_writer.WriteMessageFile(message_file) finally: message_file.Close()
[docs] def Export(self, source_path, output_writer): """Exports message resources. Args: source_path (str): source path. output_writer (ExporterOutputWriter): output writer. """ event_providers_database_path = os.path.join( source_path, self._EVENT_PROVIDERS_DATABASE_FILENAME ) database_reader = sqlite_store.SQLiteAttributeContainerStore() database_reader.Open(path=event_providers_database_path, read_only=True) try: self._ExportEventLogProviders(database_reader, output_writer) self._ExportMessageResourceFiles( source_path, database_reader, output_writer ) finally: database_reader.Close()