Source code for tlo.logging.reader

import json
import logging as _logging
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List

import numpy as np
import pandas as pd


[docs]class LogData: """Builds up log data for export as dictionary with dataframes""" def __init__(self): self.data: DefaultDict[str, Dict[str, Dict[str, Any]]] = defaultdict(dict) self.allowed_logs = set() self.uuid_to_module = dict()
[docs] def parse_log_line(self, log_line: str, level: int): """ Parse LogRow at desired level :param log_line: a json line from log file that can either be a header or data row :param level: matching level to add to log, other levels will not be added """ # new header line, if this is the right level, then add module and key to log with header and blank data log_data = json.loads(log_line) if 'type' in log_data and log_data['type'] == 'header': self.uuid_to_module[log_data['uuid']] = log_id = (log_data['module'], log_data['key']) if getattr(_logging, log_data['level']) >= level: self.allowed_logs.add(log_id) self.data[log_data['module']][log_data['key']] = {'header': log_data, 'values': [], 'dates': []} else: log_id = (log_data['module'], log_data['key']) = self.uuid_to_module[log_data['uuid']] # log data row if we allow this logger if log_id in self.allowed_logs: self.data[log_data['module']][log_data['key']]['dates'].append(log_data['date']) self.data[log_data['module']][log_data['key']]['values'].append(log_data['values'])
[docs] def get_log_dataframes(self) -> DefaultDict[str, Dict[str, pd.DataFrame]]: """ Converts parsed logs of dictionaries to dataframes and then returns all logs :return: dictionary of output logs with dataframes for each log key """ output_logs: DefaultDict[str, Dict[str, pd.DataFrame]] = defaultdict(dict) for module, log_data in self.data.items(): output_logs['_metadata'][module] = dict() for key, data in log_data.items(): output_logs['_metadata'][module][key] = data['header'] if list(data['header']['columns'].keys()) == ['dataframe']: output_logs[module][key] = self.parse_logged_dataframe(data['values'], data['dates']) else: output_logs[module][key] = pd.DataFrame(data['values'], columns=data['header']['columns'].keys()) output_logs[module][key].insert( 0, "date", pd.Series(data["dates"], dtype=np.dtype('datetime64[ns]')) ) # for each column, cast to the correct type if necessary for n, t in data['header']['columns'].items(): if t == "Timestamp": output_logs[module][key][n] = output_logs[module][key][n].astype('datetime64[ns]') elif t == "Categorical": output_logs[module][key][n] = output_logs[module][key][n].astype('category') elif t == "set": output_logs[module][key][n] = output_logs[module][key][n].apply(set) return output_logs
[docs] def parse_logged_dataframe(self, values: List[List[Dict[str, Dict[str, Any]]]], dates: List[str]) -> pd.DataFrame: """ Converts log data for an entire dataframe being logged into a mutli-indexed dataframe :param values: logged values :param dates: list of dates :return: Multi-indexed (log_row, df_row) dataframe """ # Convert data to {(log_row_i, df_row_i): {df_col_name_1: df_col_val_1, df_col_name_2: df_col_val_2...}} indexed_data = {(log_row_i, df_row_i): log_row[df_row_i] # Each log row as the first part of multi-index for log_row_i in range(len(values)) # each row is a list with one dictionary as the value for log_row in values[0] # the index for each row of the logged dataframe for df_row_i in log_row.keys()} # create dataframe from indexed data, and join dates based on the log row index log_date = pd.DataFrame(pd.Series(dates, name='date', dtype=np.dtype('datetime64[ns]'))) log_date.index.set_names("log_row", inplace=True) logged_df = pd.DataFrame.from_dict(indexed_data, orient='index') logged_df.index.set_names(['log_row', 'df_row'], inplace=True) return log_date.join(logged_df)