Source code for tlo.methods.healthsystem

import datetime
import heapq as hp
import itertools
import re
import warnings
from collections import Counter, defaultdict
from import Iterable
from itertools import repeat
from pathlib import Path
from typing import Dict, List, NamedTuple, Optional, Tuple, Union

import numpy as np
import pandas as pd
from pandas.testing import assert_series_equal

import tlo
from tlo import Date, DateOffset, Module, Parameter, Property, Types, logging
from tlo.analysis.utils import (  # get_filtered_treatment_ids,
from import Event, PopulationScopeEventMixin, Priority, RegularEvent
from tlo.methods import Metadata
from tlo.methods.bed_days import BedDays
from tlo.methods.consumables import (
from tlo.methods.dxmanager import DxManager
from import Equipment
from tlo.methods.hsi_event import (

logger = logging.getLogger(__name__)

logger_summary = logging.getLogger(f"{__name__}.summary")

# Declare the assumption for the availability of consumables at the merged levels '1b' and '2'. This can be a
#  list of facility_levels over which an average is taken (within a district): e.g. ['1b', '2'].
AVAILABILITY_OF_CONSUMABLES_AT_MERGED_LEVELS_1B_AND_2 = ['1b']  # <-- Implies that availability at merged level '1b & 2'
#                                                                     is equal to availability at level '1b'. This is
#                                                                     reasonable because the '1b' are more numerous than
#                                                                     those of '2' and have more overall capacity, so
#                                                                     probably account for the majority of the
#                                                                     interactions.

def pool_capabilities_at_levels_1b_and_2(df_original: pd.DataFrame) -> pd.DataFrame:
    """Return a modified version of the imported capabilities DataFrame to reflect that the capabilities of level 1b
    are pooled with those of level 2, and all labelled as level 2."""

    # Find total minutes and staff count after the re-allocation of capabilities from '1b' to '2'
    tots_after_reallocation = df_original \
        .assign(Facility_Level=lambda df: df.Facility_Level.replace({
                            '1b': LABEL_FOR_MERGED_FACILITY_LEVELS_1B_AND_2,
                            '2': LABEL_FOR_MERGED_FACILITY_LEVELS_1B_AND_2})
                ) \
        .groupby(by=['Facility_Level', 'District', 'Region', 'Officer_Category'], dropna=False)[[
            'Total_Mins_Per_Day', 'Staff_Count']] \
        .sum() \

    # Construct a new version of the dataframe that uses the new totals
    df_updated = df_original \
        .drop(columns=['Total_Mins_Per_Day', 'Staff_Count'])\
               on=['Facility_Level', 'District', 'Region', 'Officer_Category'],
               ) \
            Total_Mins_Per_Day=lambda df: df.Total_Mins_Per_Day.fillna(0.0),
            Staff_Count=lambda df: df.Staff_Count.fillna(0.0)

    # Check that the *total* number of minutes per officer in each district/region is the same as before the change
        df_updated.groupby(by=['District', 'Region', 'Officer_Category'], dropna=False)['Total_Mins_Per_Day'].sum(),
        df_original.groupby(by=['District', 'Region', 'Officer_Category'], dropna=False)['Total_Mins_Per_Day'].sum()


    # Check size/shape of the updated dataframe is as expected
    assert df_updated.shape == df_original.shape
    assert (df_updated.dtypes == df_original.dtypes).all()

    for _level in ['0', '1a', '3', '4']:
        assert df_original.loc[df_original.Facility_Level == _level].equals(
            df_updated.loc[df_updated.Facility_Level == _level])

    assert np.isclose(
        df_updated.loc[df_updated.Facility_Level == LABEL_FOR_MERGED_FACILITY_LEVELS_1B_AND_2,
        df_updated.loc[df_updated.Facility_Level.isin(['1b', '2']), 'Total_Mins_Per_Day'].sum()

    return df_updated

class AppointmentSubunit(NamedTuple):
    """Component of an appointment relating to a specific officer type."""
    officer_type: str
    time_taken: float

def _accepts_argument(function: callable, argument: str) -> bool:
    """Helper to test if callable object accepts an argument with a given name.

    Compared to using `inspect.signature` or `inspect.getfullargspec` the approach here
    has significantly less overhead (as a full `Signature` or `FullArgSpec` object
    does not need to constructed) but is also less readable hence why it has been
    wrapped as a helper function despite being only one-line to make its functionality
    more obvious.

    :param function: Callable object to check if argument is present in.
    :param argument: Name of argument to check.
    :returns: ``True`` is ``argument`` is an argument of ``function`` else ``False``.
    # co_varnames include both arguments to function and any internally defined variable
    # names hence we check only in the first `co_argcount` items which correspond to
    # just the arguments
    return argument in function.__code__.co_varnames[:function.__code__.co_argcount]

[docs] class HealthSystem(Module): """ INIT_DEPENDENCIES = {'Demography'} PARAMETERS = { # Organization of the HealthSystem 'Master_Facilities_List': Parameter(Types.DATA_FRAME, 'Listing of all health facilities.'), # Definitions of the officers and appointment types 'Officer_Types_Table': Parameter(Types.DATA_FRAME, 'The names of the types of health workers ("officers")'), 'Appt_Types_Table': Parameter(Types.DATA_FRAME, 'The names of the type of appointments with the health system'), 'Appt_Offered_By_Facility_Level': Parameter( Types.DATA_FRAME, 'Table indicating whether or not each appointment is offered at each facility level.'), 'Appt_Time_Table': Parameter(Types.DATA_FRAME, 'The time taken for each appointment, according to officer and facility type.'), # Capabilities of the HealthSystem (under alternative assumptions) 'Daily_Capabilities_actual': Parameter( Types.DATA_FRAME, 'The capabilities (minutes of time available of each type of officer in each facility) ' 'based on the _estimated current_ number and distribution of staff estimated.'), 'Daily_Capabilities_funded': Parameter( Types.DATA_FRAME, 'The capabilities (minutes of time available of each type of officer in each facility) ' 'based on the _potential_ number and distribution of staff estimated (i.e. those ' 'positions that can be funded).'), 'Daily_Capabilities_funded_plus': Parameter( Types.DATA_FRAME, 'The capabilities (minutes of time available of each type of officer in each facility) ' 'based on the _potential_ number and distribution of staff estimated, with adjustments ' 'to permit each appointment type that should be run at facility level to do so in every ' 'district.'), 'use_funded_or_actual_staffing': Parameter( Types.STRING, "If `actual`, then use the numbers and distribution of staff estimated to be available" " currently; If `funded`, then use the numbers and distribution of staff that are " "potentially available. If 'funded_plus`, then use a dataset in which the allocation of " "staff to facilities is tweaked so as to allow each appointment type to run at each " "facility_level in each district for which it is defined. N.B. This parameter is " "over-ridden if an argument is provided to the module initialiser.", # N.B. This could have been of type `Types.CATEGORICAL` but this made over-writing through `Scenario` # difficult, due to the requirement that the over-writing value and original value are of the same type # (enforced at line 376 of ), # Consumables 'item_and_package_code_lookups': Parameter( Types.DATA_FRAME, 'Data imported from the OneHealth Tool on consumable items, packages and costs.'), 'availability_estimates': Parameter( Types.DATA_FRAME, 'Estimated availability of consumables in the LMIS dataset.'), 'cons_availability': Parameter( Types.STRING, "Availability of consumables. If 'default' then use the availability specified in the ResourceFile; if " "'none', then let no consumable be ever be available; if 'all', then all consumables are always available." " When using 'all' or 'none', requests for consumables are not logged. NB. This parameter is over-ridden" "if an argument is provided to the module initialiser."), # Infrastructure and Equipment 'BedCapacity': Parameter( Types.DATA_FRAME, "Data on the number of beds available of each type by facility_id"), 'beds_availability': Parameter( Types.STRING, "Availability of beds. If 'default' then use the availability specified in the ResourceFile; if " "'none', then let no beds be ever be available; if 'all', then all beds are always available. NB. This " "parameter is over-ridden if an argument is provided to the module initialiser."), 'EquipmentCatalogue': Parameter( Types.DATA_FRAME, "Data on equipment items and packages."), 'equipment_availability_estimates': Parameter( Types.DATA_FRAME, "Data on the availability of equipment items and packages." ), 'equip_availability': Parameter( Types.STRING, "What to assume about the availability of equipment. If 'default' then use the availability specified in " "the ResourceFile; if 'none', then let no equipment ever be available; if 'all', then all equipment is " "always available. NB. This parameter is over-ridden if an argument is provided to the module initialiser." ), 'equip_availability_postSwitch': Parameter( Types.STRING, "What to assume about the availability of equipment after the switch (see `year_equip_availability_switch`" "). The options for this are the same as `equip_availability`." ), 'year_equip_availability_switch': Parameter( Types.INT, "Year in which the assumption for `equip_availability` changes (The change happens on 1st January of that " "year.)" ), # Service Availability 'Service_Availability': Parameter( Types.LIST, 'List of services to be available. NB. This parameter is over-ridden if an argument is provided' ' to the module initialiser.'), 'policy_name': Parameter( Types.STRING, "Name of priority policy adopted"), 'year_mode_switch': Parameter( Types.INT, "Year in which mode switch is enforced"), 'scale_to_effective_capabilities': Parameter( Types.BOOL, "In year in which mode switch takes place, will rescale available capabilities to match those" "that were effectively used (on average) in the past year if this is set to True. This way," "we can approximate overtime and rushing of appts even in mode 2."), 'year_cons_availability_switch': Parameter( Types.INT, "Year in which consumable availability switch is enforced. The change happens" "on 1st January of that year.)"), 'priority_rank': Parameter( Types.DICT, "Data on the priority ranking of each of the Treatment_IDs to be adopted by " " the queueing system under different policies, where the lower the number the higher" " the priority, and on which categories of individuals classify for fast-tracking " " for specific treatments"), 'HR_scaling_by_level_and_officer_type_table': Parameter( Types.DICT, "Factors by which capabilities of medical officer types at different levels will be" "scaled at the start of the year specified by `year_HR_scaling_by_level_and_officer_type`. This" "serves to simulate a number of effects (e.g. absenteeism, boosting capabilities of specific " "medical cadres, etc). This is the imported from an Excel workbook: keys are the worksheet " "names and values are the worksheets in the format of pd.DataFrames. Additional scenarios can " "be added by adding worksheets to this workbook: the value of " "`HR_scaling_by_level_and_officer_type_mode` indicates which sheet is used." ), 'year_HR_scaling_by_level_and_officer_type': Parameter( Types.INT, "Year in which one-off constant HR scaling will take place. (The change happens" "on 1st January of that year.)" ), 'HR_scaling_by_level_and_officer_type_mode': Parameter( Types.STRING, "Mode of HR scaling considered at the start of the simulation. This corresponds to the name" "of the worksheet in `ResourceFile_HR_scaling_by_level_and_officer_type.xlsx` that should be" " used. Options are: `default` (capabilities are scaled by a constaint factor of 1); `data` " "(factors informed by survey data); and, `custom` (user can freely set these factors as " "parameters in the analysis).", ), 'HR_scaling_by_district_table': Parameter( Types.DICT, "Factors by which daily capabilities in different districts will be" "scaled at the start of the year specified by year_HR_scaling_by_district to simulate" "(e.g., through catastrophic event disrupting delivery of services in particular district(s))." "This is the import of an Excel workbook: keys are the worksheet names and values are the " "worksheets in the format of pd.DataFrames. Additional scenarios can be added by adding " "worksheets to this workbook: the value of `HR_scaling_by_district_mode` indicates which" "sheet is used." ), 'year_HR_scaling_by_district': Parameter( Types.INT, "Year in which scaling of daily capabilities by district will take place. (The change happens" "on 1st January of that year.)"), 'HR_scaling_by_district_mode': Parameter( Types.STRING, "Mode of scaling of daily capabilities by district. This corresponds to the name of the " "worksheet in the file `ResourceFile_HR_scaling_by_district.xlsx`." ), 'yearly_HR_scaling': Parameter( Types.DICT, "Factors by which HR capabilities are scaled. " "Each sheet specifies a 'mode' for dynamic HR scaling. The mode to use is determined by the " "parameter `yearly_HR_scaling_mode`. Each sheet must have the same format, including the same " "column headers. On each sheet, the first row (for `2010`, when the simulation starts) " "specifies the initial configuration: `dynamic_HR_scaling_factor` (float) is the factor by " "which all human resoucres capabilities and multiplied; `scale_HR_by_popsize` (bool) specifies " "whether the capabilities should (also) grow by the factor by which the population has grown in" " the last year. Each subsequent row specifies a year where there should be a CHANGE in the " "configuration. If there are no further rows, then there is no change. But, for example, an" " additional row of the form ```2015, 1.05, TRUE``` would mean that on 1st January of 2015, " "2016, 2017, ....(and the rest of the simulation), the capabilities would increase by the " "product of 1.05 and by the ratio of the population size to that in the year previous." ), 'yearly_HR_scaling_mode': Parameter( Types.STRING, "Specifies which of the policies in yearly_HR_scaling should be adopted. This corresponds to" "a worksheet of the file `ResourceFile_dynamic_HR_scaling.xlsx`." ), 'tclose_overwrite': Parameter( Types.INT, "Decide whether to overwrite tclose variables assigned by disease modules"), 'tclose_days_offset_overwrite': Parameter( Types.INT, "Offset in days from topen at which tclose will be set by the healthsystem for all HSIs" "if tclose_overwrite is set to True."), # Mode Appt Constraints 'mode_appt_constraints': Parameter( Types.INT, 'Integer code in `{0, 1, 2}` determining mode of constraints with regards to officer numbers ' 'and time - 0: no constraints, all HSI events run with no squeeze factor, 1: elastic constraints' ', all HSI events run with squeeze factor, 2: hard constraints, only HSI events with no squeeze ' 'factor run. N.B. This parameter is over-ridden if an argument is provided' ' to the module initialiser.', ), 'mode_appt_constraints_postSwitch': Parameter( Types.INT, 'Mode considered after a mode switch in year_mode_switch.'), 'cons_availability_postSwitch': Parameter( Types.STRING, 'Consumables availability after switch in `year_cons_availability_switch`. Acceptable values' 'are the same as those for Parameter `cons_availability`.') } PROPERTIES = { 'hs_is_inpatient': Property( Types.BOOL, 'Whether or not the person is currently an in-patient at any medical facility' ), }
[docs] def __init__( When using 'all' or 'none', requests for consumables are not logged. :param beds_availability: If 'default' then use the availability specified in the ResourceFile; if 'none', then let no beds be ever be available; if 'all', then all beds are always available. :param equip_availability: If 'default' then use the availability specified in the ResourceFile; if 'none', then let no equipment ever be available; if 'all', then all equipment is always available. :param randomise_queue ensure that the queue is not model-dependent, i.e. properly randomised for equal topen and priority :param ignore_priority: If ``True`` do not use the priority information in HSI event to schedule :param policy_name: Name of priority policy adopted :param capabilities_coefficient: Multiplier for the capabilities of health officers, if ``None`` set to ratio of initial population to estimated 2010 population. :param use_funded_or_actual_staffing: If `actual`, then use the numbers and distribution of staff estimated to be available currently; If `funded`, then use the numbers and distribution of staff that are potentially available. If 'funded_plus`, then use a dataset in which the allocation of staff to facilities is tweaked so as to allow each appointment type to run at each facility_level in each district for which it is defined. :param disable: If ``True``, disables the health system (no constraints and no logging) and every HSI event runs. :param disable_and_reject_all: If ``True``, disable health system and no HSI events run :param compute_squeeze_factor_to_district_level: Whether to compute squeeze_factors to the district level, or the national level (which effectively pools the resources across all districts). :param hsi_event_count_log_period: Period over which to accumulate counts of HSI events that have run before logging and reseting counters. Should be on of strings ``'day'``, ``'month'``, ``'year'``. ``'simulation'`` to log at the end of each day, end of each calendar month, end of each calendar year or the end of the simulation respectively, or ``None`` to not track the HSI event details and frequencies. """ super().__init__(name) self.resourcefilepath = resourcefilepath assert isinstance(disable, bool) assert isinstance(disable_and_reject_all, bool) assert not (disable and disable_and_reject_all), ( 'Cannot have both disable and disable_and_reject_all selected' ) assert not (ignore_priority and policy_name is not None), ( 'Cannot adopt a priority policy if the priority will be then ignored' ) self.disable = disable self.disable_and_reject_all = disable_and_reject_all self.mode_appt_constraints = None # Will be the final determination of the `mode_appt_constraints' if mode_appt_constraints is not None: assert mode_appt_constraints in {0, 1, 2} self.arg_mode_appt_constraints = mode_appt_constraints self.rng_for_hsi_queue = None # Will be a dedicated RNG for the purpose of randomising the queue self.rng_for_dx = None # Will be a dedicated RNG for the purpose of determining Dx Test results self.randomise_queue = randomise_queue self.ignore_priority = ignore_priority # This default value will be overwritten if assumed policy is not None self.lowest_priority_considered = 2 # Check that the name of policy being evaluated is included self.priority_policy = None if policy_name is not None: assert policy_name in ['', 'Default', 'Test', 'Test Mode 1', 'Random', 'Naive', 'RMNCH', 'VerticalProgrammes', 'ClinicallyVulnerable', 'EHP_III', 'LCOA_EHP'] self.arg_policy_name = policy_name self.tclose_overwrite = None self.tclose_days_offset_overwrite = None # Store the fast tracking channels that will be relevant for policy given the modules included self.list_fasttrack = [] # provided so that there is a default even before simulation is run # Store the argument provided for service_availability self.arg_service_availability = service_availability self.service_availability = ['*'] # provided so that there is a default even before simulation is run # Check that the capabilities coefficient is correct if capabilities_coefficient is not None: assert capabilities_coefficient >= 0 assert isinstance(capabilities_coefficient, float) self.capabilities_coefficient = capabilities_coefficient # Find which set of assumptions to use - those for the actual staff available or the funded staff available if use_funded_or_actual_staffing is not None: assert use_funded_or_actual_staffing in ['actual', 'funded', 'funded_plus'] self.arg_use_funded_or_actual_staffing = use_funded_or_actual_staffing # Define (empty) list of registered disease modules (filled in at `initialise_simulation`) self.recognised_modules_names = [] # Define the container for calls for health system interaction events self.HSI_EVENT_QUEUE = [] self.hsi_event_queue_counter = 0 # Counter to help with the sorting in the heapq # Store the arguments provided for cons/beds/equip_availability assert cons_availability in (None, 'default', 'all', 'none') self.arg_cons_availability = cons_availability assert beds_availability in (None, 'default', 'all', 'none') self.arg_beds_availability = beds_availability assert equip_availability in (None, 'default', 'all', 'none') self.arg_equip_availability = equip_availability # `compute_squeeze_factor_to_district_level` is a Boolean indicating whether the computation of squeeze_factors # should be specific to each district (when `True`), or if the computation of squeeze_factors should be on the # basis that resources from all districts can be effectively "pooled" (when `False). assert isinstance(compute_squeeze_factor_to_district_level, bool) self.compute_squeeze_factor_to_district_level = compute_squeeze_factor_to_district_level # Create the Diagnostic Test Manager to store and manage all Diagnostic Test self.dx_manager = DxManager(self) # Create the pointer that will be to the instance of BedDays used to track in-patient bed days self.bed_days = None # Create the pointer that will be to the instance of Consumables used to determine availability of consumables. self.consumables = None # Create pointer for the HealthSystemScheduler event self.healthsystemscheduler = None # Create pointer to the `HealthSystemSummaryCounter` helper class self._summary_counter = HealthSystemSummaryCounter() # Create counter for the running total of footprint of all the HSIs being run today self.running_total_footprint: Counter = Counter() self._hsi_event_count_log_period = hsi_event_count_log_period if hsi_event_count_log_period in {"day", "month", "year", "simulation"}: # Counters for binning HSI events run (by unique integer keys) over # simulation period specified by hsi_event_count_log_period and cumulative # counts over previous log periods self._hsi_event_counts_log_period = Counter() self._hsi_event_counts_cumulative = Counter() # Dictionary mapping from HSI event details to unique integer keys self._hsi_event_details = dict() # Counters for binning HSI events that never ran (by unique integer keys) over # simulation period specified by hsi_event_count_log_period and cumulative # counts over previous log periods self._never_ran_hsi_event_counts_log_period = Counter() self._never_ran_hsi_event_counts_cumulative = Counter() # Dictionary mapping from HSI event details to unique integer keys self._never_ran_hsi_event_details = dict() elif hsi_event_count_log_period is not None: raise ValueError( "hsi_event_count_log_period argument should be one of 'day', 'month' " "'year', 'simulation' or None." )
[docs] def read_parameters(self, data_folder):
[docs] def pre_initialise_population(self):
[docs] def initialise_population(self, population):
[docs] def initialise_simulation(self, sim):
[docs] def on_birth(self, mother_id, child_id):
[docs] def on_simulation_end(self):
[docs] def setup_priority_policy(self):
[docs] def process_human_resources_files(self, use_funded_or_actual_staffing: str):
[docs] def format_daily_capabilities(self, use_funded_or_actual_staffing: str) -> pd.Series:
[docs] def _rescale_capabilities_to_capture_effective_capability(self):
[docs] def update_consumables_availability_to_represent_merging_of_levels_1b_and_2(self, df_original):
[docs] def get_service_availability(self) -> List[str]:
[docs] def get_cons_availability(self) -> str:
[docs] def get_beds_availability(self) -> str:
[docs] def get_equip_availability(self) -> str:
[docs] def schedule_to_call_never_ran_on_date(self, hsi_event: 'HSI_Event', tdate: datetime.datetime):
[docs] def get_mode_appt_constraints(self) -> int:
[docs] def get_use_funded_or_actual_staffing(self) -> str:
[docs] def get_priority_policy_initial(self) -> str:
[docs] def load_priority_policy(self, policy):
[docs] def schedule_hsi_event(
[docs] def _add_hsi_event_queue_item_to_hsi_event_queue(self, priority, topen, tclose, hsi_event) -> None:
# This is where the priority policy is enacted
[docs] def enforce_priority_policy(self, hsi_event) -> int:
[docs] def check_hsi_event_is_valid(self, hsi_event):
[docs] @staticmethod
    def is_treatment_id_allowed(treatment_id: str, service_availability: list) -> bool:
[docs] def schedule_batch_of_individual_hsi_events(
[docs] def appt_footprint_is_valid(self, appt_footprint):
@property def capabilities_today(self) -> pd.Series: """ Returns the capabilities of the health system today. returns: pd.Series giving minutes available for each officer type in each facility type Functions can go in here in the future that could expand the time available, simulating increasing efficiency (the concept of a productivity ratio raised by Martin Chalkley). For now this method only multiplies the estimated minutes available by the `capabilities_coefficient` scale factor. """ return self._daily_capabilities * self.capabilities_coefficient
[docs] def get_blank_appt_footprint(self):
[docs] def get_facility_info(self, hsi_event) -> FacilityInfo:
[docs] def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_footprint: dict):
[docs] def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities, compute_squeeze_factor_to_district_level: bool
[docs] def record_hsi_event(self, hsi_event, actual_appt_footprint=None, squeeze_factor=None, did_run=True, priority=None):
[docs] def write_to_hsi_log(
[docs] def call_and_record_never_ran_hsi_event(self, hsi_event, priority=None):
[docs] def write_to_never_ran_hsi_log(
[docs] def log_current_capabilities_and_usage(self):
[docs] def remove_beddays_footprint(self, person_id):
[docs] def find_events_for_person(self, person_id: int):
[docs] def reset_queue(self):
[docs] def get_item_codes_from_package_name(self, package: str) -> dict:
[docs] def get_item_code_from_item_name(self, item: str) -> int:
[docs] def override_availability_of_consumables(self, item_codes) -> None:
[docs] def _write_hsi_event_counts_to_log_and_reset(self):
[docs] def _write_never_ran_hsi_event_counts_to_log_and_reset(self):
[docs] def on_end_of_day(self) -> None:
[docs] def on_end_of_month(self) -> None:
[docs] def on_end_of_year(self) -> None:
[docs] def run_population_level_events(self, _list_of_population_hsi_event_tuples: List[HSIEventQueueItem]) -> None:
[docs] def run_individual_level_events_in_mode_0_or_1(self, _list_of_individual_hsi_event_tuples: List[HSIEventQueueItem]) -> List:
@property def hsi_event_counts(self) -> Counter: """Counts of details of HSI events which have run so far in simulation. Returns a ``Counter`` instance with keys ``HSIEventDetail`` named tuples corresponding to details of HSI events that have run over simulation so far. """ if self._hsi_event_count_log_period is None: return Counter() else: # If in middle of log period _hsi_event_counts_log_period will not be empty # and so overall total counts is sums of counts in both # _hsi_event_counts_cumulative and _hsi_event_counts_log_period total_hsi_event_counts = ( self._hsi_event_counts_cumulative + self._hsi_event_counts_log_period ) return Counter( { event_details: total_hsi_event_counts[event_details_key] for event_details, event_details_key in self._hsi_event_details.items() } ) @property def never_ran_hsi_event_counts(self) -> Counter: """Counts of details of HSI events which never ran so far in simulation. Returns a ``Counter`` instance with keys ``HSIEventDetail`` named tuples corresponding to details of HSI events that have never ran over simulation so far. """ if self._hsi_event_count_log_period is None: return Counter() else: # If in middle of log period _hsi_event_counts_log_period will not be empty # and so overall total counts is sums of counts in both # _hsi_event_counts_cumulative and _hsi_event_counts_log_period total_never_ran_hsi_event_counts = ( self._never_ran_hsi_event_counts_cumulative + self._never_ran_hsi_event_counts_log_period ) return Counter( { event_details: total_never_ran_hsi_event_counts[event_details_key] for event_details, event_details_key in self._never_ran_hsi_event_details.items() } )
[docs] class HealthSystemScheduler(RegularEvent, PopulationScopeEventMixin):
[docs] def __init__(self, module: HealthSystem):
[docs] @staticmethod
    def _is_last_day_of_the_year(date):
[docs] @staticmethod
    def _is_last_day_of_the_month(date):
[docs] def _get_events_due_today(self,) -> Tuple[List, List]:
[docs] def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> None:
[docs] def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: From benchmarks # converting Series to list before converting to set is ~2x more performant than # direct conversion to set, while checking membership of set is ~10x quicker # than checking membership of Pandas Index object and ~25x quicker than checking # membership of list alive_persons = set( self.sim.population.props.index[self.sim.population.props.is_alive].to_list() ) list_of_population_hsi_event_tuples_due_today = list() list_of_events_not_due_today = list() # Traverse the queue and run events due today until have capabilities still available while len(self.module.HSI_EVENT_QUEUE) > 0: # Check if any of the officers in the country are still available for today. # If not, no point in going through the queue any longer. # This will make things slower for tests/small simulations, but should be of significant help # in the case of large simulations in mode_appt_constraints = 2 where number of people in the # queue for today >> resources available for that day. This would be faster done by facility. if len(set_capabilities_still_available) > 0: next_event_tuple = hp.heappop(self.module.HSI_EVENT_QUEUE) # Read the tuple and remove from heapq, and assemble into a dict 'next_event' event = next_event_tuple.hsi_event if > next_event_tuple.tclose: # The event has expired (after tclose) having never been run. Call the 'never_ran' function self.module.call_and_record_never_ran_hsi_event( hsi_event=event, priority=next_event_tuple.priority ) elif not ( isinstance(, tlo.population.Population) or in alive_persons ): # if individual level event and the person who is the target is no longer alive, # do nothing more, i.e. remove from heapq pass elif < next_event_tuple.topen: # The event is not yet due (before topen) hp.heappush(list_of_events_not_due_today, next_event_tuple) if next_event_tuple.priority == self.module.lowest_priority_considered: # Check the priority # If the next event is not due and has the lowest allowed priority, then stop looking # through the heapq as all other events will also not be due. break else: # The event is now due to run today and the person is confirmed to be still alive. # Add it to the list of events due today if at population level. # Otherwise, run event immediately. is_pop_level_hsi_event = isinstance(, tlo.population.Population) if is_pop_level_hsi_event: list_of_population_hsi_event_tuples_due_today.append(next_event_tuple) else: # Retrieve officers&facility required for HSI original_call = next_event_tuple.hsi_event.expected_time_requests _priority = next_event_tuple.priority # In this version of mode_appt_constraints = 2, do not have access to squeeze # based on queue information, and we assume no squeeze ever takes place. squeeze_factor = 0. # Check if any of the officers required have run out. out_of_resources = False for officer, call in original_call.items(): # If any of the officers are not available, then out of resources if officer not in set_capabilities_still_available: out_of_resources = True # If officers still available, run event. Note: in current logic, a little # overtime is allowed to run last event of the day. This seems more realistic # than medical staff leaving earlier than # planned if seeing another patient would take them into overtime. if out_of_resources: # Do not run, # Call did_not_run for the hsi_event rtn_from_did_not_run = event.did_not_run() # If received no response from the call to did_not_run, or a True signal, then # add to the hold-over queue. # Otherwise (disease module returns "FALSE") the event is not rescheduled and # will not run. if rtn_from_did_not_run is not False: # reschedule event # Add the event to the queue: hp.heappush(hold_over, next_event_tuple) # Log that the event did not run self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, squeeze_factor=squeeze_factor, did_run=False, priority=_priority ) # Have enough capabilities left to run event else: # Notes-to-self: Shouldn't this be done after checking the footprint? # Compute the bed days that are allocated to this HSI and provide this # information to the HSI if sum(event.BEDDAYS_FOOTPRINT.values()): event._received_info_about_bed_days = \ self.module.bed_days.issue_bed_days_according_to_availability( facility_id=self.module.bed_days.get_facility_id_for_beds(, footprint=event.BEDDAYS_FOOTPRINT ) # Check that a facility has been assigned to this HSI assert event.facility_info is not None, \ f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined." # Check if equipment declared is available. If not, call `never_ran` and do not run the # event. (`continue` returns flow to beginning of the `while` loop) if not event.is_all_declared_equipment_available: self.module.call_and_record_never_ran_hsi_event( hsi_event=event, priority=next_event_tuple.priority ) continue # Expected appt footprint before running event _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT # Run event & get actual footprint actual_appt_footprint = # Check if the HSI event returned updated_appt_footprint, and if so adjust original_call if actual_appt_footprint is not None: # check its formatting: assert self.module.appt_footprint_is_valid(actual_appt_footprint) # Update call that will be used to compute capabilities used updated_call = self.module.get_appt_footprint_as_time_request( facility_info=event.facility_info, appt_footprint=actual_appt_footprint ) else: actual_appt_footprint = _appt_footprint_before_running updated_call = original_call # Recalculate call on officers based on squeeze factor. for k in updated_call.keys(): updated_call[k] = updated_call[k]/(squeeze_factor + 1.) # Subtract this from capabilities used so-far today capabilities_monitor.subtract(updated_call) # If any of the officers have run out of time by performing this hsi, # remove them from list of available officers. for officer, call in updated_call.items(): if capabilities_monitor[officer] <= 0: if officer in set_capabilities_still_available: set_capabilities_still_available.remove(officer) else: logger.warning( key="message", data=(f"{event.TREATMENT_ID} actual_footprint requires different" f"officers than expected_footprint.") ) # Update today's footprint based on actual call and squeeze factor self.module.running_total_footprint -= original_call self.module.running_total_footprint += updated_call # Write to the log self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=actual_appt_footprint, squeeze_factor=squeeze_factor, did_run=True, priority=_priority ) # Don't have any capabilities at all left for today, no # point in going through the queue to check what's left to do today. else: break # In previous iteration, we stopped querying the queue once capabilities # were exhausted, so here we traverse the queue again to ensure that if any events expired were # left unchecked they are properly removed from the queue, and did_not_run() is invoked for all # postponed events. (This should still be more efficient than querying the queue as done in # mode_appt_constraints = 0 and 1 while ensuring mid-day effects are avoided.) # We also schedule a call_never_run for any HSI below the lowest_priority_considered, # in case any of them where left in the queue due to a transition from mode 0/1 to mode 2 while len(self.module.HSI_EVENT_QUEUE) > 0: next_event_tuple = hp.heappop(self.module.HSI_EVENT_QUEUE) # Read the tuple and remove from heapq, and assemble into a dict 'next_event' event = next_event_tuple.hsi_event # If the priority of the event is lower than lowest_priority_considered, schedule a call_never_ran # on tclose regardless of whether appt is due today or any other time. (Although in mode 2 HSIs with # priority > lowest_priority_considered are never added to the queue, some such HSIs may still be present # in the queue if mode 2 was preceded by a period in mode 1). if next_event_tuple.priority > self.module.lowest_priority_considered: self.module.schedule_to_call_never_ran_on_date(hsi_event=event, tdate=next_event_tuple.tclose) elif > next_event_tuple.tclose: # The event has expired (after tclose) having never been run. Call the 'never_ran' function self.module.call_and_record_never_ran_hsi_event( hsi_event=event, priority=next_event_tuple.priority ) elif not ( isinstance(, tlo.population.Population) or in alive_persons ): # if individual level event and the person who is the target is no longer alive, # do nothing more, i.e. remove from heapq pass elif < next_event_tuple.topen: # The event is not yet due (before topen). Do not stop querying the queue here if we have # reached the lowest_priority_considered, as we want to make sure HSIs with lower priority # (which may have been scheduled during a prior mode 0/1 period) are flushed from the queue. hp.heappush(list_of_events_not_due_today, next_event_tuple) else: # Add it to the list of events due today if at population level. # Otherwise, run event immediately. is_pop_level_hsi_event = isinstance(, tlo.population.Population) if is_pop_level_hsi_event: list_of_population_hsi_event_tuples_due_today.append(next_event_tuple) else: # In previous iteration, have already run all the events for today that could run # given capabilities available, so put back any remaining events due today to the # hold_over queue as it would not be possible to run them today. # Do not run, # Call did_not_run for the hsi_event rtn_from_did_not_run = event.did_not_run() # If received no response from the call to did_not_run, or a True signal, then # add to the hold-over queue. # Otherwise (disease module returns "FALSE") the event is not rescheduled and # will not run. if rtn_from_did_not_run is not False: # reschedule event # Add the event to the queue: hp.heappush(hold_over, next_event_tuple) # Log that the event did not run self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, squeeze_factor=0, did_run=False, priority=next_event_tuple.priority ) # add events from the list_of_events_not_due_today back into the queue while len(list_of_events_not_due_today) > 0: hp.heappush(self.module.HSI_EVENT_QUEUE, hp.heappop(list_of_events_not_due_today)) # Run the list of population-level HSI events self.module.run_population_level_events(list_of_population_hsi_event_tuples_due_today)
[docs] def apply(self, population):
# --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- class HealthSystemSummaryCounter: """Helper class to keep running counts of HSI and the state of the HealthSystem and logging summaries.""" def __init__(self): self._reset_internal_stores() def _reset_internal_stores(self) -> None: """Create empty versions of the data structures used to store a running records.""" self._treatment_ids = defaultdict(int) # Running record of the `TREATMENT_ID`s of `HSI_Event`s self._appts = defaultdict(int) # Running record of the Appointments of `HSI_Event`s that have run self._appts_by_level = {_level: defaultdict(int) for _level in ('0', '1a', '1b', '2', '3', '4')} # <--Same as `self._appts` but also split by facility_level # Log HSI_Events that never ran to monitor shortcoming of Health System self._never_ran_treatment_ids = defaultdict(int) # As above, but for `HSI_Event`s that never ran self._never_ran_appts = defaultdict(int) # As above, but for `HSI_Event`s that have never ran self._never_ran_appts_by_level = {_level: defaultdict(int) for _level in ('0', '1a', '1b', '2', '3', '4')} self._frac_time_used_overall = [] # Running record of the usage of the healthcare system self._sum_of_daily_frac_time_used_by_officer_type_and_level = Counter() self._squeeze_factor_by_hsi_event_name = defaultdict(list) # Running record the squeeze-factor applying to each # treatment_id. Key is of the form: # "<TREATMENT_ID>:<HSI_EVENT_NAME>" def record_hsi_event(self, treatment_id: str, hsi_event_name: str, squeeze_factor: float, appt_footprint: Counter, level: str ) -> None: """Add information about an `HSI_Event` to the running summaries.""" # Count the treatment_id: self._treatment_ids[treatment_id] += 1 # Add the squeeze-factor to the list self._squeeze_factor_by_hsi_event_name[ f"{treatment_id}:{hsi_event_name}" ].append(squeeze_factor) # Count each type of appointment: for appt_type, number in appt_footprint: self._appts[appt_type] += number self._appts_by_level[level][appt_type] += number def record_never_ran_hsi_event(self, treatment_id: str, hsi_event_name: str, appt_footprint: Counter, level: str ) -> None: """Add information about a never-ran `HSI_Event` to the running summaries.""" # Count the treatment_id: self._never_ran_treatment_ids[treatment_id] += 1 # Count each type of appointment: for appt_type, number in appt_footprint: self._never_ran_appts[appt_type] += number self._never_ran_appts_by_level[level][appt_type] += number def record_hs_status( self, fraction_time_used_across_all_facilities: float, fraction_time_used_by_officer_type_and_level: Dict[Tuple[str, int], float], ) -> None: """Record a current status metric of the HealthSystem.""" # The fraction of all healthcare worker time that is used: self._frac_time_used_overall.append(fraction_time_used_across_all_facilities) for officer_type_facility_level, fraction_time in fraction_time_used_by_officer_type_and_level.items(): self._sum_of_daily_frac_time_used_by_officer_type_and_level[officer_type_facility_level] += fraction_time def write_to_log_and_reset_counters(self): """Log summary statistics reset the data structures. This usually occurs at the end of the year.""" key="HSI_Event", description="Counts of the HSI_Events that have occurred in this calendar year by TREATMENT_ID, " "and counts of the 'Appt_Type's that have occurred in this calendar year," "and the average squeeze_factor for HSIs that have occurred in this calendar year.", data={ "TREATMENT_ID": self._treatment_ids, "Number_By_Appt_Type_Code": self._appts, "Number_By_Appt_Type_Code_And_Level": self._appts_by_level, 'squeeze_factor': { k: sum(v) / len(v) for k, v in self._squeeze_factor_by_hsi_event_name.items() } }, ) # Log summary of HSI_Events that never ran key="Never_ran_HSI_Event", description="Counts of the HSI_Events that never ran in this calendar year by TREATMENT_ID, " "and the respective 'Appt_Type's that have not occurred in this calendar year.", data={ "TREATMENT_ID": self._never_ran_treatment_ids, "Number_By_Appt_Type_Code": self._never_ran_appts, "Number_By_Appt_Type_Code_And_Level": self._never_ran_appts_by_level, }, ) key="Capacity", description="The fraction of all the healthcare worker time that is used each day, averaged over this " "calendar year.", data={ "average_Frac_Time_Used_Overall": np.mean(self._frac_time_used_overall), # <-- leaving space here for additional summary measures that may be needed in the future. }, ) # Log mean of 'fraction time used by officer type and facility level' from daily entries from the previous # year. key="Capacity_By_OfficerType_And_FacilityLevel", description="The fraction of healthcare worker time that is used each day, averaged over this " "calendar year, for each officer type at each facility level.", data=flatten_multi_index_series_into_dict_for_logging( self.frac_time_used_by_officer_type_and_level()), ) self._reset_internal_stores() def frac_time_used_by_officer_type_and_level( self, officer_type: Optional[str]=None, level: Optional[str]=None, ) -> Union[float, pd.Series]: """Average fraction of time used by officer type and level since last reset. If `officer_type` and/or `level` is not provided (left to default to `None`) then a pd.Series with a multi-index is returned giving the result for all officer_types/levels.""" if (officer_type is not None) and (level is not None): return ( self._sum_of_daily_frac_time_used_by_officer_type_and_level[officer_type, level] / len(self._frac_time_used_overall) # Use len(self._frac_time_used_overall) as proxy for number of days in past year. ) else: # Return multiple in the form of a pd.Series with multiindex mean_frac_time_used = { (_officer_type, _level): v / len(self._frac_time_used_overall) for (_officer_type, _level), v in self._sum_of_daily_frac_time_used_by_officer_type_and_level.items() if (_officer_type == officer_type or officer_type is None) and (_level == level or level is None) } return pd.Series( index=pd.MultiIndex.from_tuples( mean_frac_time_used.keys(), names=['OfficerType', 'FacilityLevel'] ), data=mean_frac_time_used.values() ).sort_index()
[docs] class HealthSystemChangeParameters(Event, PopulationScopeEventMixin):
[docs] def __init__(self, module: HealthSystem, parameters: Dict):
[docs] def apply(self, population):
[docs] class DynamicRescalingHRCapabilities(RegularEvent, PopulationScopeEventMixin):
[docs] def __init__(self, module):
@property def current_pop_size(self) -> float: """Returns current population size""" df = self.sim.population.props return df.is_alive.sum()
[docs] def _get_most_recent_year_specified_for_a_change_in_configuration(self) -> int:
[docs] def apply(self, population):
[docs] class ConstantRescalingHRCapabilities(Event, PopulationScopeEventMixin):
[docs] def __init__(self, module):
[docs] def apply(self, population):
[docs] class RescaleHRCapabilities_ByDistrict(Event, PopulationScopeEventMixin):
[docs] def __init__(self, module):
[docs] def apply(self, population):
[docs] class HealthSystemChangeMode(RegularEvent, PopulationScopeEventMixin):
[docs] def __init__(self, module):
[docs] def apply(self, population):