Source code for winevtrc.eventlog_providers

# -*- coding: utf-8 -*-
"""Windows Event Log providers collector."""

import logging

from winevtrc import resources


[docs]class EventLogProvidersCollector(object): """Windows Event Log providers collector.""" _SERVICES_EVENTLOG_KEY_PATH = ( 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Services\\EventLog') _WINEVT_PUBLISHERS_KEY_PATH = ( 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion\\' 'WINEVT\\Publishers') def _CollectEventLogProviders( self, services_eventlog_key, winevt_publishers_key): """Collects Windows Event Log providers. Args: services_eventlog_key (dfwinreg.WinRegistryKey): a Services\\EventLog Windows Registry. winevt_publishers_key (dfwinreg.WinRegistryKey): a WINEVT\\Publishers Windows Registry. Yields: EventLogProvider: an Event Log provider. """ event_log_providers_per_identifier = {} event_log_providers_per_log_source = {} for event_log_provider in self._CollectEventLogProvidersFromServicesKey( services_eventlog_key): log_source = event_log_provider.log_sources[0] existing_event_log_provider = event_log_providers_per_identifier.get( event_log_provider.identifier, None) if existing_event_log_provider: if log_source not in existing_event_log_provider.log_sources: existing_event_log_provider.log_sources.append(log_source) existing_event_log_provider.category_message_files.update( event_log_provider.category_message_files) existing_event_log_provider.event_message_files.update( event_log_provider.event_message_files) existing_event_log_provider.parameter_message_files.update( event_log_provider.parameter_message_files) continue if log_source in event_log_providers_per_log_source: logging.warning(( 'Found multiple definitions for Event Log provider: ' '{0:s}').format(log_source)) continue event_log_providers_per_log_source[log_source] = event_log_provider if event_log_provider.identifier: event_log_providers_per_identifier[event_log_provider.identifier] = ( event_log_provider) for event_log_provider in self._CollectEventLogProvidersFromPublishersKeys( winevt_publishers_key): log_source = event_log_provider.log_sources[0] existing_event_log_provider = event_log_providers_per_log_source.get( log_source, None) if not existing_event_log_provider: existing_event_log_provider = event_log_providers_per_identifier.get( event_log_provider.identifier, None) if existing_event_log_provider: if log_source not in existing_event_log_provider.log_sources: existing_event_log_provider.log_sources.append(log_source) if existing_event_log_provider: existing_event_log_provider.event_message_files.update( event_log_provider.event_message_files) if not existing_event_log_provider.identifier: existing_event_log_provider.identifier = event_log_provider.identifier elif existing_event_log_provider.identifier != ( event_log_provider.identifier): existing_event_log_provider.additional_identifier = ( existing_event_log_provider.identifier) existing_event_log_provider.identifier = event_log_provider.identifier else: event_log_providers_per_log_source[log_source] = event_log_provider event_log_providers_per_identifier[event_log_provider.identifier] = ( event_log_provider) for _, event_log_provider in sorted( event_log_providers_per_log_source.items()): message_files = set() paths_lower = set() for path in event_log_provider.category_message_files: path_lower = path.lower() if path_lower not in paths_lower: paths_lower.add(path_lower) message_files.add(path) event_log_provider.category_message_files = message_files message_files = set() paths_lower = set() for path in event_log_provider.event_message_files: path_lower = path.lower() if path_lower not in paths_lower: paths_lower.add(path_lower) message_files.add(path) event_log_provider.event_message_files = message_files message_files = set() paths_lower = set() for path in event_log_provider.parameter_message_files: path_lower = path.lower() if path_lower not in paths_lower: paths_lower.add(path_lower) message_files.add(path) event_log_provider.parameter_message_files = message_files yield event_log_provider def _CollectEventLogProvidersFromPublishersKeys(self, winevt_publishers_key): """Collects Windows Event Log providers from a WINEVT publishers key. Args: winevt_publishers_key (dfwinreg.WinRegistryKey): WINEVT publishers key. Yield: EventLogProvider: Event Log provider. """ if winevt_publishers_key: for guid_key in winevt_publishers_key.GetSubkeys(): provider_identifier = guid_key.name.lower() log_source = self._GetValueFromKey(guid_key, '', default_value='') event_log_provider = resources.EventLogProvider( provider_identifier, log_source, '') event_log_provider.event_message_files = ( self._GetMessageFilePathsFromKey(guid_key, 'MessageFileName')) yield event_log_provider def _CollectEventLogProvidersFromServicesKey(self, services_eventlog_key): """Collects Windows Event Log providers from a services Event Log key. Args: services_eventlog_key (dfwinreg.WinRegistryKey): services Event Log key. Yield: EventLogProvider: Event Log provider. """ if services_eventlog_key: for log_type_key in services_eventlog_key.GetSubkeys(): for provider_key in log_type_key.GetSubkeys(): provider_identifier = self._GetValueFromKey( provider_key, 'ProviderGuid') if provider_identifier: provider_identifier = provider_identifier.lower() log_source = provider_key.name log_type = log_type_key.name event_log_provider = resources.EventLogProvider( provider_identifier, log_source, log_type) event_log_provider.category_message_files = ( self._GetMessageFilePathsFromKey( provider_key, 'CategoryMessageFile')) event_log_provider.event_message_files = ( self._GetMessageFilePathsFromKey( provider_key, 'EventMessageFile')) event_log_provider.parameter_message_files = ( self._GetMessageFilePathsFromKey( provider_key, 'ParameterMessageFile')) yield event_log_provider def _GetMessageFilePathsFromKey(self, registry_key, value_name): """Retrieves a value as a list of message file paths. Args: registry_key (dfwinreg.WinRegistryKey): Windows Registry key. value_name (str): name of the value. Returns: set[str]: paths of message files. """ message_files = set() registry_value = registry_key.GetValueByName(value_name) if registry_value: value_string = registry_value.GetDataAsObject() for path in value_string.split(';'): path = path.strip() if path: message_files.add(path) return message_files def _GetValueFromKey(self, registry_key, value_name, default_value=None): """Retrieves a value from a Registry value. Args: registry_key (dfwinreg.WinRegistryKey): Windows Registry key. value_name (str): name of the value. default_value (Optional[str]): default value. Returns: str: value or the default value if not available. """ if not registry_key: return default_value registry_value = registry_key.GetValueByName(value_name) if not registry_value: return default_value return registry_value.GetDataAsObject()
[docs] def Collect(self, registry): """Collects Windows Event Log providers from a Windows Registry. Args: registry (dfwinreg.WinRegistry): Windows Registry. Returns: generator[EventLogProvider]: Event Log provider generator. """ # TODO: add support to collect Event Log providers from all control sets. services_eventlog_key = registry.GetKeyByPath( self._SERVICES_EVENTLOG_KEY_PATH) winevt_publishers_key = registry.GetKeyByPath( self._WINEVT_PUBLISHERS_KEY_PATH) return self._CollectEventLogProviders( services_eventlog_key, winevt_publishers_key)