Source code for tlo.lm

import ast
import builtins
import numbers
from enum import Enum, auto
from math import prod
from typing import Any, Callable, Dict, Optional, Tuple, Union

import numpy as np
import pandas as pd
from pandas.core.computation.parsing import clean_column_name

from tlo import logging

logger = logging.getLogger(__name__)

[docs]class Predictor(object): def __init__( self, property_name: str = None, external: bool = False, conditions_are_mutually_exclusive: Optional[bool] = None, conditions_are_exhaustive: Optional[bool] = False, ): """A Predictor variable for the regression model. :param property_name: A property of the population dataframe e.g. age, sex, etc. or if ``external=True`` the name of the external property that will be passed as a keyword argument to the ``LinearModel.predict`` method. :param external: Whether the named property is external (``True``) and so will be passed as a keyword argument to the ``LinearModel.predict`` method) or is a property of the population dataframe (``False``). :param conditions_are_mutually_exclusive: Whether the set of conditions that are declared for this predictor are all mutually exclusive, that is, for any pair of conditions, one condition evaluating to ``True`` implies the other must evaluate to ``False``. If this is declared to be the case a more efficient method of evaluation will be used in ``LinearModel.predict``. Note however that the validity of this declaration will not be checked so if this is set to ``True`` for predictors with non-mutually exclusive conditions, the model output will be erroneous. :param conditions_are_exhaustive: Whether the set of conditions that are declared for this predictor are all exhaustive, that is at least one condition will always be ``True`` irrespective of the value of the property. If this is declared to be the case, a more efficient method of evaluation maye be used in ``LinearModel.predict`, though if a catch-all ``otherwise`` condition is included this flag will provide no benefit. Note that the validity of this declaration will not be checked so if this is set to ``True`` for predictors with non-exhaustive conditions, the model output will be erroneous. """ self.property_name = property_name # If this is a property that is not part of the population dataframe if external: assert property_name is not None, "Can't have an unnamed external predictor" self.property_name = f'__{self.property_name}__' self.conditions = list() self.callback = None self.has_otherwise = False self.conditions_are_mutually_exclusive = conditions_are_mutually_exclusive self.conditions_are_exhaustive = conditions_are_exhaustive
[docs] def when(self, condition: Union[str, float, bool], value: float) -> 'Predictor': assert self.callback is None, "Can't use `when` on Predictor with function" return self._coeff(condition=condition, coefficient=value)
[docs] def otherwise(self, value: float) -> 'Predictor': assert self.property_name is not None, "Can't use `otherwise` condition on unnamed Predictor" assert self.callback is None, "Can't use `otherwise` on Predictor with function" return self._coeff(coefficient=value)
[docs] def apply(self, callback: Callable[[Any], float]) -> 'Predictor': assert self.property_name is not None, "Can't use `apply` on unnamed Predictor" assert len(self.conditions) == 0, "Can't specify `apply` on Predictor with when/otherwise conditions" assert self.callback is None, "Can't specify more than one callback for a Predictor" self.callback = callback return self
def _coeff(self, *, coefficient, condition=None) -> 'Predictor': """Adds the coefficient for the Predictor. The arguments can be two: `coeff(condition, value)` where the condition evaluates the property value to true/false `coeff(value)` where the value is given to all unconditioned values of the property The second style (unconditioned value) only makes sense after one or more conditioned values """ # If there isn't a property name if self.property_name is None: # We use the supplied condition literally self.conditions.append((condition, coefficient)) return self # Otherwise, the condition is applied on a specific property if isinstance(condition, str): # Handle either a complex condition (begins with an operator) or implicit equality if condition[0] in ['!', '=', '<', '>', '~', '(', '.']: parsed_condition = f'({self.property_name}{condition})' else: # numeric values don't need to be quoted if condition.isnumeric(): parsed_condition = f'({self.property_name} == {condition})' else: parsed_condition = f'({self.property_name} == "{condition}")' elif isinstance(condition, bool): if condition: parsed_condition = f'({self.property_name} == True)' else: parsed_condition = f'({self.property_name} == False)' elif isinstance(condition, numbers.Number): parsed_condition = f'({self.property_name} == {condition})' elif condition is None: assert not self.has_otherwise, "You can only give one unconditioned value to predictor" self.has_otherwise = True parsed_condition = None else: raise RuntimeError(f"Unhandled condition: {condition}") self.conditions.append((parsed_condition, coefficient)) return self def __str__(self): if self.property_name and self.property_name.startswith('__'): name = f'{self.property_name.strip("__")} (external)' else: name = self.property_name if self.callback: return f"{name} -> callback({self.callback})" out = [] previous_condition = None for condition, value in self.conditions: if condition is None: out.append(f'{" " * len(previous_condition)} -> {value} (otherwise)') else: out.append(f"{condition} -> {value}") previous_condition = condition return "\n ".join(out)
[docs]class LinearModelType(Enum): """ The type of model specifies how the results from the predictor are combined: 'additive' -> adds the effect_sizes from the predictors 'logisitic' -> multiples the effect_sizes from the predictors and applies the transform x/(1+x) [Thus, the intercept can be taken to be an Odds and effect_sizes Odds Ratios, and the prediction is a probability.] 'multiplicative' -> multiplies the effect_sizes from the predictors """ ADDITIVE = auto() LOGISTIC = auto() MULTIPLICATIVE = auto() # the 'custom' is used internally by the custom() method CUSTOM = auto()
[docs]class LinearModel(object): def __init__( self, lm_type: LinearModelType, intercept: Union[float, int], *predictors: Predictor ): """A linear model has an intercept and zero or more ``Predictor`` variables. :param lm_type: Model type to use. :param intercept: Intercept term for the model. :param *predictors: Any ``Predictor`` instances to use in computing output. """ assert lm_type in LinearModelType, ( "Model should be one of the prescribed LinearModelTypes" ) self._lm_type = lm_type assert isinstance(intercept, (float, int)), ( "Intercept is not specified or wrong type." ) assert np.isfinite(intercept), "Intercept must not be NaN or infinite" self._intercept = intercept # Store predictors as tuple and expose via read-only property to prevent # updates after model initialisation self._predictors = tuple(predictors) non_predictors = [p for p in self._predictors if not isinstance(p, Predictor)] assert len(non_predictors) == 0, ( f"One or more predictors are of invalid type: {non_predictors}" ) self._parse_predictors() @property def lm_type(self) -> LinearModelType: """The model type.""" return self._lm_type @property def intercept(self) -> Union[float, int]: """The intercept value for the model.""" return self._intercept @property def predictors(self) -> Tuple[Predictor]: """The predictors used in calculating the model output.""" return self._predictors
[docs] @staticmethod def multiplicative(*predictors: Predictor): """Returns a multplicative LinearModel with intercept=1.0 :param predictors: One or more Predictor objects defining the model """ return LinearModel(LinearModelType.MULTIPLICATIVE, 1.0, *predictors)
[docs] @staticmethod def custom(predict_function, **kwargs): """Define a linear model using the supplied function The function acts as a drop-in replacement to the predict function and must implement the interface: ( self: LinearModel, df: Union[pd.DataFrame, pd.Series], rng: Optional[np.random.RandomState] = None, **kwargs ) -> pd.Series It is the responsibility of the caller of predict to ensure they pass either a dataframe or an individual record as expected by the custom function. See test_custom() in for a couple of examples. """ # create an instance of a custom linear model custom_model = LinearModel(LinearModelType.CUSTOM, 0) # replace this instance's predict method # see custom_model.predict = predict_function.__get__(custom_model, LinearModel) # save value to any keyword arguments inside of this linear model for k, v in kwargs.items(): # check the name doesn't already exist assert not hasattr(custom_model, k), ( f"Cannot store argument '{k}' as name already exists; change name.") setattr(custom_model, k, v) return custom_model
def _parse_predictors(self): """Set model string, callback predictors and predictor names from predictors. Sets `self._model_string` to an expression string (to be evaluated by ``pandas.DataFrame.eval``) corresponding to the evaluation of the model output for the subset of the predictors which do not define a custom callback function and the model intercept, or an empty string if no non-callback predictors are present. Additionally sets `self._callback_predictors` to a tuple of the omitted predictors with custom callback functions and `self._predictor_names` to a set of strings corresponding to names specified in the predictors. """ # For additive models a zero coefficient corresponds to no effect while for # multiplicative and logistic models the relevant value is one null_coeff_value = 0 if self.lm_type == LinearModelType.ADDITIVE else 1 predictor_strings = [] callback_predictors = [] self._predictor_names = set() for predictor in self.predictors: if predictor.callback is None: if predictor.property_name is not None: self._predictor_names.add(predictor.property_name) else: # If no property_name specified, predictor conditions will # contain one or more column names therefore parse condition # strings and filter for all name nodes. This will also # add non-column names such as builtin functions so need to # check if names are actually columns before using for condition, _ in predictor.conditions: self._predictor_names.update( for node in ast.walk(ast.parse(condition)) if isinstance(node, ast.Name) ) has_catch_all_condition = False for i, (condition, value) in enumerate(predictor.conditions): if i == 0: if condition is None: # 'otherwise' fallback condition - always True. If used as # first condition any other conditions will be ignored as # this condition matches all predictor_str = f"{value}" any_prev_conds = "True" has_catch_all_condition = True break else: predictor_str = f"({condition}) * {value}" any_prev_conds = f"{condition}" else: if condition is None: # 'otherwise' fallback condition - matches all not # so far matched therefore can ignore any remaining # conditions predictor_str += f" + (~({any_prev_conds})) * {value}" has_catch_all_condition = True break elif predictor.conditions_are_mutually_exclusive: # conditions have been declared to be mutually exclusive # therefore we can just multiply conditions by coefficient # values as condition == ~any_prev_conds & condition predictor_str += f" + ({condition}) * {value}" any_prev_conds += f" | {condition}" else: # conditions are potentially non-mutually exclusive and # are applied sequentially in order specified on subset # not matching any previous conditions predictor_str += ( f" + (~({any_prev_conds}) & {condition}) * {value}") any_prev_conds += f" | {condition}" # If the predictor neither declares that the conditions are exhaustive # (i.e. all cases are covered an any_prev_conds is guaranteed to be # True) nor an 'otherwise' catch-all condition has been used (in which # case any_prev_conds is also guaranteed to be True) then add term # corresponding to no effect when no previous conditions matched if not (predictor.conditions_are_exhaustive or has_catch_all_condition): predictor_str += f" + ~({any_prev_conds}) * {null_coeff_value}" predictor_strings.append(f"({predictor_str})") else: self._predictor_names.add(predictor.property_name) callback_predictors.append(predictor) self._callback_predictors = tuple(callback_predictors) if len(predictor_strings) > 0: if self.intercept != null_coeff_value: # Only need to include intercept if its non-zero in additive models # or non-unity in multiplicative/logistic models predictor_strings.append(f"{self.intercept}") if self.lm_type == LinearModelType.ADDITIVE: self._model_string = " + ".join(predictor_strings) else: self._model_string = " * ".join(predictor_strings) def _get_column_resolvers( self, df: pd.DataFrame, **external_variables ) -> Dict[str, pd.Series]: """Construct mapping from predictor column names to column values. For use in ``resolvers`` argument to ``pandas.eval`` call. Compared to ``pandas.DataFrame._get_cleaned_column_resolvers()`` here only the column names present in the model predictors are included when constructing the returned dictionary. For dataframes with a large number of columns this is more performant than iterating over all columns, of which typically only a small subset are used in each linear model. Any external variables specified in predictors are also included with dunder-wrapped keys (e.g '__ext_var__'). """ column_resolvers = {} for name in self._predictor_names: # predictor_names may contain built-in names that are not columns # therefore we need to check if name is column in dataframe col = df.get(name) if col is not None: cleaned_name = clean_column_name(name) if ( isinstance(col.dtype, pd.CategoricalDtype) and np.issubdtype(col.dtype.categories.dtype, np.integer) ): # `pandas.eval` raises an error when using boolean # operations on series with a categorical dtype with integer # categories therefore if any such columns are present we # convert to the corresponding integer type dtype = col.dtype.categories.dtype column_resolvers[cleaned_name] = col.astype(dtype) else: column_resolvers[cleaned_name] = col for name, value in external_variables.items(): column_resolvers[f"__{name}__"] = pd.Series(value, index=df.index) return column_resolvers
[docs] def predict( self, df: pd.DataFrame, rng: Optional[np.random.RandomState] = None, squeeze_single_row_output=True, **kwargs ) -> pd.Series: """Evaluate linear model output for a given set of input data. :param df: The input ``DataFrame`` containing the input data to evaluate the model with. :param rng: If set to a NumPy ``RandomState`` instance, returned output will be boolean ``Series`` corresponding to Bernoulli random variables sampled according to probabilities specified by model output. Otherwise model output directly returned. :param squeeze_single_row_output: If ``rng`` argument is not ``None`` and this argument is set to ``True``, the output for a ``df`` input with a single-row will be a scalar boolean value rather than a boolean ``Series``. :param **kwargs: Values for any external variables included in model predictors. """ # Check that all names specified in predictors are either a column name, an # external variable in kwargs (with __ prefix/suffix removed) or a built-in for name in self._predictor_names: assert ( name in df or ( name.startswith("__") and name.endswith("__") and name.strip("__") in kwargs ) or name in builtins.__dict__ ), ( f"Predictors include unknown name {name}" ) column_resolvers = self._get_column_resolvers(df, **kwargs) if self._model_string != "": result = pd.eval( self._model_string, resolvers=(column_resolvers,), engine="python" ) else: result = pd.Series(data=self.intercept, index=df.index) if len(self._callback_predictors) > 0: callback_results = [ column_resolvers[p.property_name].apply(p.callback) for p in self._callback_predictors ] if self.lm_type == LinearModelType.ADDITIVE: result += sum(callback_results) else: result *= prod(callback_results) # Ensure result of floating point type even if all predictor coefficients # are integer but intercept is floating point if isinstance(self.intercept, float) and result.dtype == int: result = result.astype(float) # Result series sometimes picks up name from one of predictors - set to # None so comparisons with unnamed series in tests pass = None if self.lm_type == LinearModelType.LOGISTIC: result = result / (1 + result) # If the user supplied a random number generator then they want outcomes, # not probabilities if rng: outcome = rng.random_sample(len(result)) < result # pop the boolean out of the series if we have a single row, # otherwise return the series if len(outcome) == 1 and squeeze_single_row_output: return outcome.iloc[0] else: return outcome else: return result
def __str__(self): out = "LinearModel(\n"\ f" {self.lm_type},\n"\ f" intercept = {self.intercept},\n" for predictor in self.predictors: out += f' {predictor}\n' out += ")" return out