Source code for groupedpaneldatamodels.model

# This file has all the main code of each of the models (based on linearmodels)
# The main logic of each model is implemented in their own file, this file provides the classes
# as the main code is functional and not object oriented


# Imports
# First local imports
from .models.ando_bai import (
    grouped_interactive_effects as ando_bai,
    grouped_interactive_effects_hetrogeneous as ando_bai_heterogeneous,
)
from .models.bonhomme_manresa import (
    grouped_fixed_effects as bonhomme_manresa,
    _compute_statistics as bm_compute_statistics,
)
from .models.su_ju import interactive_effects_estimation as su_ju
from .models.su_shi_phillips import fixed_effects_estimation as su_shi_phillips
from .information_criteria import compute_statistics

# Second standard library imports
from typing import Literal, Any
from copy import deepcopy, copy
from datetime import datetime
from time import process_time

# Third party imports
from statsmodels.iolib.summary import Summary
from statsmodels.iolib.table import SimpleTable
from numpy.typing import ArrayLike
from numpy.random import default_rng, SeedSequence
from scipy.stats import norm
from tqdm import tqdm
from joblib import Parallel, delayed

import numpy as np

# Commonly used shared functions (may also put them in utility.py)
# TBD

# Errors
# TBD

# NOTE pass RNG to the models


# Base Class
class _GroupedPanelModelBase:  # type:ignore
    """
    Base class for grouped panel data models.
    This class provides the basic structure and functionality for grouped panel data models.
    It is not meant to be instantiated directly, but rather to be subclassed by specific models.
    """

    def __init__(
        self,
        dependent: ArrayLike,
        exog: ArrayLike,
        use_bootstrap: bool = False,
        random_state=None,
        **kwargs,
    ):
        """
        Initialize the base class for grouped panel data models.

        This class sets up the core structure used by all grouped panel estimators, including
        the dependent variable, exogenous variables, bootstrap settings, and general configuration options.

        Parameters:
            dependent (ArrayLike): Dependent variable array, expected shape (N, T, 1).
            exog (ArrayLike): Exogenous variable array, expected shape (N, T, K).
            use_bootstrap (bool, optional): Whether to compute bootstrap-based standard errors. Defaults to False.
            random_state (int, optional): Seed for the random number generator. Defaults to None.
            **kwargs: Optional keyword arguments for advanced configuration.
                - hide_progressbar (bool): Whether to suppress progress bars during fitting. Defaults to False.
                - disable_analytical_se (bool): Whether to skip analytical standard error calculation. Defaults to False.
        """
        # TODO Voor nu alles omzetten naar een array, maar weet niet hoe handig dat altijd is
        # want je verliest wel de namen van de kolommen, misschien net als linearmodels een
        # aparte class hiervoor maken
        self.dependent = np.asarray(dependent)
        self.exog = np.asarray(exog)
        self._N, self._T, self._K = self.exog.shape  # type:ignore
        # Parallel‑safe random number generator
        self._rng = default_rng(random_state)
        self._random_state = random_state

        # Set up relevant information that needs to be stored
        self._use_bootstrap = use_bootstrap
        # self._original_index = self.dependent.index
        self._name = self.__class__.__name__
        self._fit_datetime = None  # Time when the model was fitted
        self._fit_start = None  # Start time for fitting the model
        self._fit_duration = None  # Duration of the fitting process
        self._model_type = None  # Type of model, can be used for identification
        self._params = None
        self._IC = None
        self._params_analytical_se = None
        self._params_bootstrap_se = None
        self._hide_progressbar = kwargs.pop("hide_progressbar", False)
        self._disable_analytical_se = kwargs.pop("disable_analytical_se", False)
        self._resid = None  # Residuals of the model, to be computed after fitting

        # TODO implement self._not_null (only if neccesary)
        self._validate_data()  # TODO implement this function

        # TODO implement cov_estimators

    def __str__(self) -> str:
        return f"{self._name} ({self._model_type}) \nShape exog: {self.exog.shape}\nShape dependent: {self.dependent.shape}\n"

    def __repr__(self) -> str:
        return self.__str__() + f"\nid: {hex(id(self))}"

    def _validate_data(self) -> None:
        # TODO not that relevant for now
        pass

    @property
    def N(self) -> int:
        """
        Returns the number of observations

        Returns
        -------
        int
            The number of observations
        """
        return self._N

    @property
    def T(self) -> int:
        """
        Returns the number of time periods

        Returns
        -------
        int
            The number of time periods
        """
        return self._T

    @property
    def K(self) -> int:
        """
        Returns the number of exogenous variables

        Returns
        -------
        int
            The number of exogenous variables
        """
        return self._K

    @property
    def params(self) -> dict:
        """
        Returns the parameters of the model

        Returns
        -------
        dict
            The parameters of the model
        """
        if self._params is None:
            raise ValueError("Model has not been fitted yet")
        return self._params

    @property
    def params_bootstrap_standard_errors(self) -> dict:
        """
        Returns the bootstrap standard errors of the parameters

        Returns
        -------
        dict | None
            The bootstrap standard errors of the parameters, or None if not available
        """
        if self._params_bootstrap_se is None:
            raise ValueError("Model has not been fitted yet or no bootstrap was used")
        return self._params_bootstrap_se

    @property
    def params_analytical_standard_errors(self) -> dict:
        """
        Returns the analytical standard errors of the parameters

        Returns
        -------
        dict | None
            The analytical standard errors of the parameters, or None if not available
        """
        if self._params_analytical_se is None:
            raise ValueError("Model has not been fitted yet or no analytical se was implemented")
        return self._params_analytical_se

    @property
    def params_standard_errors(self) -> dict:
        """
        Returns the standard errors of the parameters

        Returns
        -------
        dict | None
            The standard errors of the parameters, or None if not available
        """
        if not self._use_bootstrap:
            return self.params_analytical_standard_errors

        return self.params_bootstrap_standard_errors

    @property
    def t_values(self) -> dict:
        """
        Returns the t-values of the parameters

        Returns
        -------
        dict | None
            The t-values of the parameters, or None if not available
        """
        return {param: self.params[param] / se for param, se in self.params_standard_errors.items()}

    @property
    def resid(self) -> np.ndarray:
        """
        Returns the residuals of the model

        Returns
        -------
        ArrayLike
            The residuals of the model

        Raises
        ------
        ValueError
            If the model has not been fitted yet or residuals are not available for this model.
        """
        if self._resid is None:
            raise ValueError("Model has not been fitted yet or residuals are not available for this model")
        return self._resid

    def p_values(self) -> dict:
        """
        Returns the p-values of the parameters

        Returns
        -------
        dict | None
            The p-values of the parameters, or None if not available
        """
        return {param: 2 * (1 - norm.cdf(np.abs(t))) for param, t in self.t_values.items()}

    @property
    def IC(self) -> dict:
        """
        Returns the information criteria of the model

        Returns
        -------
        dict | None
            The information criteria of the model, or None if not available
        """
        if self._IC is None:
            raise ValueError("Model has not been fitted yet or IC values are not available for this model")
        return self._IC

    # TODO: F-stat, R^2, andere dingen

    # FIXME add more pre-fit checks if needed
    # e.g. check if the data is in the right format, if the dependent variable is a 3D array, etc.
    def _pre_fit(self):
        """
        Internal method to prepare the model for fitting.
        """
        self._fit_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        self._fit_start = process_time()  # Start time for fitting the model

    def _post_fit(self):
        """
        Internal method to finalize the model after fitting.
        This method should be called after the model has been fitted.
        """
        assert self._fit_start is not None, "Fit start time is not set. Did you call _pre_fit()?"
        self._fit_duration = process_time() - self._fit_start  # Calculate the time taken to fit the model

    def fit(self) -> "_GroupedPanelModelBase":
        """
        Fit the grouped panel data model to the provided dataset.

        This method should be implemented by subclasses of `_GroupedPanelModelBase`. It defines
        the main estimation routine and is responsible for setting the model's fitted parameters,
        residuals, and any diagnostics such as information criteria or standard errors.

        Returns:
            _GroupedPanelModelBase: The fitted model instance.

        Raises:
            NotImplementedError: This base method must be overridden by a subclass.
        """
        # TODO implement this function
        raise NotImplementedError("Fit function not implemented yet")

    def _get_bootstrap_standard_errors(
        self, params: tuple[str], n_boot: int = 50, require_deepcopy=False, n_jobs=-1, **kwargs
    ):
        """
        Computes bootstrap standard errors for the parameters.

        Parameters
        ----------
        params: tuple[str], required, displays which parameters to compute the bootstrap standard errors for
        n_boot: int, optional, the number of bootstrap samples to use, default is 50
        require_deepcopy: bool, optional, whether to require a deepcopy of the model, default is False
        **kwargs: Additional keyword arguments that can be passed to the model fitting function.

        Returns
        -------
        dict
            The confidence intervals for the parameters
        """

        if not self._use_bootstrap:
            return None

        # Prepare parallel bootstrap estimations using joblib
        seed_seq = SeedSequence(self._random_state)
        child_seqs = seed_seq.spawn(n_boot)
        rngs = [default_rng(s) for s in child_seqs]

        def _bootstrap_iteration(rng):
            model_copy = deepcopy(self) if require_deepcopy else copy(self)
            model_copy._use_bootstrap = False
            model_copy._disable_analytical_se = True
            sample = rng.choice(self.N, replace=True, size=self.N)
            model_copy._rng = rng
            model_copy.dependent = self.dependent[sample, :, :]
            model_copy.exog = self.exog[sample, :, :]
            return model_copy.fit(**kwargs).params

        estimations = Parallel(n_jobs=n_jobs)(
            delayed(_bootstrap_iteration)(rng) for rng in tqdm(rngs, disable=self._hide_progressbar)
        )

        self._bootstrap_estimations = estimations
        self._params_bootstrap_se = {}

        # FIXME standard errors are only correct for beta
        # a solution has to be computed for the other parameters
        for p in params:
            se = np.std([estimation[p] for estimation in estimations], axis=0, ddof=1)  # type:ignore
            self._params_bootstrap_se[p] = se

    def _get_analytical_standard_errors(self):
        """
        Computes analytical standard errors for the parameters.
        This function is a placeholder and should be implemented in subclasses.

        Returns
        -------
        dict
            The analytical standard errors for the parameters
        """
        # TODO implement this function
        raise NotImplementedError("Analytical standard errors function not implemented yet")

    def get_confidence_intervals(
        self, confidence_level: float = 0.95, conf_type: Literal["auto", "bootstrap", "analytical"] = "auto"
    ) -> dict:
        """
        Returns the confidence intervals for the parameters, prefers bootstrap if available, otherwise analytical

        Parameters
        ----------
        confidence_level: float, optional, the confidence level for the intervals, default is 0.95
        conf_type: str, optional, the type of confidence intervals to compute, can be 'auto', 'bootstrap', or 'analytical', default is 'auto'

        Returns
        -------
        dict
            The confidence intervals for the parameters
        """
        if self._params is None:
            raise ValueError("Model has not been fitted yet")

        ci = {}
        z = norm.ppf((1 + confidence_level) / 2)  # z-score for the given confidence level

        if conf_type == "auto":
            for param, se in self.params_standard_errors.items():
                ci[param] = (self._params[param] - z * se, self._params[param] + z * se)
        elif conf_type == "bootstrap":
            for param, se in self.params_bootstrap_standard_errors.items():
                ci[param] = (self._params[param] - z * se, self._params[param] + z * se)
        elif conf_type == "analytical":
            for param, se in self.params_analytical_standard_errors.items():
                ci[param] = (self._params[param] - z * se, self._params[param] + z * se)
        else:
            raise ValueError("conf_type must be one of 'auto', 'bootstrap', or 'analytical'")

        return ci

    def predict(
        self,
        params: ArrayLike,
        *,
        exog: ArrayLike | None = None,
        data: ArrayLike | None = None,
    ) -> ArrayLike:
        """
        Predicts the dependent variable based on the parameters and exogenous variables
        This function is a placeholder and should be implemented in subclasses.

        Parameters
        ----------
        params: array_like
            The parameters
        exog: array_like
            The exogenous variables

        Returns
        -------
        array_like
            The predicted dependent variable
        """
        if exog is None:
            exog = self.exog

        # TODO implement this function
        raise NotImplementedError("Predict function not implemented yet")

    def to_dict(self, store_bootstrap_iterations=False) -> dict[str, Any]:
        """
        Converts the model to a dictionary,
        which can be useful for serialization or inspection.

        Parameters
        ----------
        store_bootstrap_iterations: bool, optional, whether to store the bootstrap iterations in the dictionary, default is False, requires a lot of memory
        If set to True, the bootstrap estimations will be included in the dictionary.

        Returns
        -------
        dict
            The model as a dictionary
        """
        return {
            "name": self._name,
            "id": hex(id(self)),
            "fit_datetime": self._fit_datetime,
            "fit_duration": self._fit_duration,
            "model_type": self._model_type,
            "params": self._params,
            "IC": self._IC,
            "use_bootstrap": self._use_bootstrap,
            "bootstrap_estimations": (
                self._bootstrap_estimations
                if store_bootstrap_iterations and hasattr(self, "_bootstrap_estimations")
                else None
            ),
            "bootstrap_se": self._params_bootstrap_se if hasattr(self, "_params_bootstrap_se") else None,
            "analytical_se": self._params_analytical_se if hasattr(self, "_params_analytical_se") else None,
            "bootstrap_conf_interval": (
                self.get_confidence_intervals(conf_type="bootstrap") if self._params_bootstrap_se is not None else None
            ),
            "analytical_conf_interval": (
                self.get_confidence_intervals(conf_type="analytical")
                if self._params_analytical_se is not None
                else None
            ),
            "N": self.N,
            "T": self.T,
            "K": self.K,
        }

    @staticmethod
    def _show_float(value: float, precision: int = 4) -> str:
        """Formats a float value to a string with a specified precision.

        Args:
            value (float): The float value to format.
            precision (int, optional): The precision that is required. Defaults to 4.

        Returns:
            str: The formatted string representation of the float value.
        """
        try:
            return f"{value:.{precision}f}" if not np.isnan(value) else "N/A"
        except Exception:
            return str(value)

    # FIXME add flexibility to choose between bootstrap and analytical standard errors
    def summary(
        self,
        confidence_level: float = 0.95,
        standard_errors: Literal["auto", "bootstrap", "analytical"] = "auto",
    ):
        """
        Generates a summary of the model, including information about the model type, observations, exogenous variables,
        groups, fit time, fit duration, and standard errors.

        Args:
            confidence_level (float, optional): The preferred confidence lebel . Defaults to 0.95.
            standard_errors (Literal["auto", "bootstrap", "analytical"], optional): Which type of errors to prefer. Defaults to "auto".
            If "auto", it will use bootstrap if available, otherwise analytical standard errors. All other values will raise a NotImplementedError.
            As this is not implemented yet.

        Raises:
            ValueError: Model has not been fitted yet
            NotImplementedError: Standard errors type other than 'auto' is not implemented yet, you can manually view them using `model.params_bootstrap_standard_errors` or `model.params_analytical_standard_errors`_

        Returns:
            Summary: A summary object containing the model information, parameters, and standard errors.
        """
        # Ensure the model has been fitted
        if self._params is None:
            raise ValueError("Model has not been fitted yet")

        if standard_errors != "auto":
            raise NotImplementedError(
                "Standard errors type other than 'auto' is not implemented yet"
                + " you can manually view them using `model.params_bootstrap_standard_errors` or `model.params_analytical_standard_errors`"
            )

        # INFORMATION HEADERS
        left = [
            ["Type", f"{self._model_type}"],
            ["Observations", self.N * self.T],
            ["Exogenous Variables", self.K],
            ["Groups", self.G if hasattr(self, "G") else "N/A"],  # type:ignore
            ["Fit Time", self._fit_datetime],
            ["Fit Duration", f"{self._fit_duration:.2f} seconds"],
            [
                "Hetrogeneous Beta",
                self.heterogeneous_beta if hasattr(self, "heterogeneous_beta") else "N/A",  # type:ignore
            ],
        ]

        ic = self._IC if self._IC is not None else {}

        right = [
            ["AIC", self._show_float(ic.get("AIC", float("nan")))],
            ["BIC", self._show_float(ic.get("BIC", float("nan")))],
            ["HQIC", self._show_float(ic.get("HQIC", float("nan")))],
            ["sigma^2", self._show_float(ic.get("sigma^2", float("nan")))],
            ["Seed", self._random_state if self._random_state is not None else "N/A"],
            ["Standard Error type", "Bootstrap" if self._use_bootstrap else "Analytical"],
            ["Confidence Level", self._show_float(confidence_level)],
        ]

        top = [left + right for left, right in zip(left, right)]
        headers_top = ["Left", "Value", "Right", "Value"]

        summary = Summary()

        summary.tables.append(SimpleTable(top, title=f"{self._name} Summary"))

        # Coef.	Std.Err.	t	P>|t|	[0.025	0.975]
        headers_params = [
            "Parameter",
            "Coef.",
            "Std.Err.",
            "t",
            "P>|t|",
            f"[{(1 - confidence_level)/2:.3f}",
            f"{(1 - (1 - confidence_level)/2):.3f}]",
        ]
        # PARAMETERS TABLE

        if self._params_bootstrap_se is not None or self._params_analytical_se is not None:
            prev_first_idx = None  # Tracks the first index of the previous parameter entry

            for param in self.params_standard_errors.keys():
                params_values = []
                for idx, v in np.ndenumerate(self.params[param]):
                    se = self.params_standard_errors[param][idx]
                    t_value = self.t_values[param][idx]
                    p_value = self.p_values()[param][idx]
                    ci_lower = self.get_confidence_intervals(confidence_level)[param][0][idx]
                    ci_upper = self.get_confidence_intervals(confidence_level)[param][1][idx]

                    # Insert empty row if first index changes
                    first_idx = idx[0] if len(idx) > 0 else None
                    if prev_first_idx is not None and first_idx != prev_first_idx:
                        params_values.append([""] * len(headers_params))

                    prev_first_idx = first_idx

                    # Format row
                    row = [
                        param + str(idx).replace("(", "").replace(")", "").replace(" ", ""),
                        self._show_float(v),
                        self._show_float(se),
                        self._show_float(t_value),
                        self._show_float(p_value),
                        self._show_float(ci_lower),
                        self._show_float(ci_upper),
                    ]
                    params_values.append(row)

                params_se_table = SimpleTable(
                    params_values,
                    headers=headers_params,
                    title=f"{param}",
                )
                summary.tables.append(params_se_table)

        for param in self.params.keys():
            if (
                (self._params_bootstrap_se is not None) or (self._params_analytical_se is not None)
            ) and param in self.params_standard_errors.keys():
                continue

            if self.params[param] is None:
                continue

            if isinstance(self.params[param], dict):
                table = SimpleTable(
                    [[a, b] for a, b in self.params[param].items()], title=f"{param} coef.", headers=["Index", "Value"]
                )
                summary.tables.append(table)

            elif self.params[param].ndim == 1:
                data = self.params[param].round(4)
                table_data = [["Value"] + data.tolist()]
                headers = ["Index"] + [f"{i}" for i in range(len(data))]
                table = SimpleTable(
                    table_data,
                    title=f"{param} coef.",
                    headers=headers,
                )
                summary.tables.append(table)
            else:
                data = self.params[param].round(4).T
                index_column = [[f"{i}"] for i in range(data.shape[0])]
                table_data = [row + value for row, value in zip(index_column, data.tolist())]
                table = SimpleTable(
                    table_data,
                    title=f"{param} coef.",
                    headers=[f"{param}"] + [f"{i}" for i in range(data.shape[0])],
                )
                # If no standard errors are available, just show the parameter values
                summary.tables.append(table)
                # break  # Only show the first parameter if no standard errors are available

        return summary


[docs] class GroupedFixedEffects(_GroupedPanelModelBase): """ GroupedFixedEffects Class for estimating grouped fixed effects in panel data models. Supports both Bonhomme and Manresa (2015) and Su, Shi, and Phillips (2016) estimators. This class clusters units into a specified number of latent groups and estimates either: - Group-specific slope coefficients (heterogeneous), or - A shared slope coefficient (homogeneous) depending on the specified configuration. It also optionally includes individual (entity-specific) fixed effects and supports bootstrap inference. Typical usage: >>> model = GroupedFixedEffects(y, x, G=3) >>> model.fit(max_iter=100) >>> model.summary() Attributes such as coefficients, group assignments, and residuals are made available after fitting. """ def __init__( self, dependent: ArrayLike, exog: ArrayLike, G: int, use_bootstrap: bool = False, model: Literal["bonhomme_manresa", "su_shi_phillips"] = "bonhomme_manresa", heterogeneous_beta: bool = True, entity_effects: bool = False, kappa: float = 0.1, **kwargs, ): """ Initialize the GroupedFixedEffects model for panel data analysis. This class estimates grouped fixed effects using either the Bonhomme and Manresa (2015) or Su, Shi, and Phillips (2016) estimators. The model is designed for settings where units can be grouped based on similarities in their fixed effects or slope coefficients. Parameters: dependent (np.ndarray): A 3D array of Y, structured as (N, T, 1), where N is the number of individuals, T is the number of time periods. exog (np.ndarray): A 3D array of X, structured as (N, T, K), containing the exogenous regressors. G (int): The (maximum) number of latent groups to estimate. use_bootstrap (bool, optional): Whether or not to estimate the standard errors using the Bootstrap method. Defaults to False. model (Literal["bonhomme_manresa", "su_shi_phillips"], optional): Which estimator to use: "bonhomme_manresa" (default) or "su_shi_phillips". heterogeneous_beta (bool, optional): Whether the coefficients β should be allowed to differ across groups. If False, they are homogeneous. Defaults to True. entity_effects (bool, optional): Whether to include individual (entity-specific) fixed effects in the estimation. Recommended for "su_shi_phillips". Defaults to False. kappa (float, optional): Regularization strength for SCAD penalty. Default is 0.1. Raises: ValueError: If the specified model is not supported. """ super().__init__(dependent, exog, use_bootstrap, **kwargs) self._entity_effects = entity_effects self._model_type = model if self._model_type not in ["bonhomme_manresa", "su_shi_phillips"]: raise ValueError("Model must be either 'bonhomme_manresa' or 'su_shi_phillips'") if self._model_type == "bonhomme_manresa": if kappa != 0.1: # NOTE 0.1 is the default value for kappa raise ValueError("kappa is not supported for the Bonhomme and Manresa model") self._kappa = kappa # Regularization strength for SCAD penalty, only used in Su and Shi Phillips self.G = int(G) self.heterogeneous_beta = heterogeneous_beta def _fit_bm(self, n_boot: int = 50, **kwargs): """ Fits the Bonhomme and Manresa model to the data Parameters ---------- n_boot: int The number of bootstrap samples to use Returns ------- self The fitted model """ boot_n_jobs = kwargs.pop("boot_n_jobs", -1) # type:ignore b, beta, g, eta, iterations, objective_value, resid = bonhomme_manresa( self.dependent, self.exog, self.G, hetrogeneous_theta=self.heterogeneous_beta, unit_specific_effects=self._entity_effects, **kwargs, ) # Create dictionary mapping group number to list of individuals g_members = {int(group): np.where(g == group)[0].tolist() for group in np.unique(g)} self._params = {"beta": b.T, "alpha": beta, "g": g_members, "eta": eta} self._resid = resid # Store the residuals assert resid is not None, "Residuals must be computed before calculating standard errors" num_params = self.G * self.T + self.N + self.K self._IC = compute_statistics(self.N * self.T, num_params, resid, include_hqic=True) self._get_analytical_standard_errors() self._get_bootstrap_standard_errors(("beta",), n_boot=n_boot, n_jobs=boot_n_jobs, **kwargs) self._post_fit() # Set the fit duration and datetime return self def _fit_ssp(self, n_boot: int = 50, **kwargs): """ Fits the Su and Shi Phillips model to the data Parameters ---------- n_boot: int The number of bootstrap samples to use Returns ------- self The fitted model """ if self.heterogeneous_beta is False: raise ValueError("Homogeneous beta is not supported for the Su and Shi Phillips model") boot_n_jobs = kwargs.pop("boot_n_jobs", -1) # type:ignore b, alpha, beta, resid = su_shi_phillips( np.squeeze(self.dependent), self.exog, self.N, self.T, self.K, self.G, use_individual_effects=self._entity_effects, kappa=self._kappa, **kwargs, ) self._params = {"beta": beta.T, "b": b, "alpha": alpha} self._resid = resid # Store the residuals # Get groupings b = b.T beta = beta.T dists = np.linalg.norm(b[:, None, :] - beta[None, :, :], axis=2) g = np.argmin(dists, axis=1) g_members = {int(group): np.where(g == group)[0].tolist() for group in np.unique(g)} self._params["g"] = g_members num_params = np.unique_counts(np.round(np.concat([b.ravel(), beta.ravel(), alpha.ravel()]), 2)).counts.sum() self._IC = compute_statistics(self.N * self.T, num_params, resid, include_hqic=True) self._get_analytical_standard_errors() self._get_bootstrap_standard_errors(("beta",), n_boot=n_boot, n_jobs=boot_n_jobs, **kwargs) self._post_fit() # Set the fit duration and datetime return self
[docs] def fit(self, **kwargs): """ Fit the GroupedFixedEffects model to the data. This method estimates grouped fixed effects based on the selected model type: - "bonhomme_manresa" implements the algorithm by Bonhomme and Manresa (2015). - "su_shi_phillips" implements the algorithm by Su, Shi, and Phillips (2016). Parameters ---------- n_boot : int, optional Number of bootstrap replications to compute standard errors. Default is 50. For model='bonhomme_manresa': max_iter : int, optional Maximum number of optimization iterations. Default is 10000. tol : float, optional Convergence tolerance for optimization. Default is 1e-6. gfe_iterations : int, optional Number of different starting values for the grouped fixed effects algorithm. Default is 100. enable_vns : bool, optional Whether to use the Variable Neighborhood Search (VNS) algorithm. Default is False. (Not recommended when `heterogeneous_beta=True` due to computational cost.) For model='su_shi_phillips': max_iter : int, optional Maximum number of optimization iterations. Default is 1000. tol : float, optional Convergence tolerance for optimization. Default is 1e-6. only_bfgs : bool, optional Whether to use only the L-BFGS optimizer. If False, alternates between L-BFGS and Nelder-Mead. Default is True. Returns ------- self : GroupedFixedEffects The fitted model instance. """ self._pre_fit() n_boot = kwargs.pop("n_boot", 50) if self._model_type == "bonhomme_manresa": return self._fit_bm(n_boot=n_boot, **kwargs) elif self._model_type == "su_shi_phillips": return self._fit_ssp(n_boot=n_boot, **kwargs) raise ValueError("Model must be either 'bonhomme_manresa' or 'su_shi_phillips'")
# FIXME this could be more generalized I believe def _get_analytical_standard_errors_bm(self): """ Computes analytical standard errors for the Bonhomme and Manresa model. """ if self._params is None: raise ValueError("Model has not been fitted yet") if self._model_type != "bonhomme_manresa": raise ValueError("This function is only applicable for the Bonhomme and Manresa model") se_alpha = np.zeros((self.G, self.T)) # type:ignore se_beta = np.zeros((self.G, self.K)) # type:ignore g = self.params["g"] if self.heterogeneous_beta else {0: list(range(self.N))} resid = self._resid # type:ignore assert resid is not None, "Residuals must be computed before calculating standard errors" for gamma in g.keys(): se_alpha[gamma] = np.sqrt((resid[g[gamma]] ** 2).mean(axis=0)) self._params_analytical_se = {"alpha": se_alpha} for gamma in g.keys(): x = self.exog[g[gamma]] x_bar = self.exog[g[gamma]].mean(axis=0) x_diff = x - x_bar[np.newaxis, :, :] total_sum_sigma = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): total_sum_sigma += x_diff[i, t, np.newaxis].T @ x_diff[i, t, np.newaxis] sigma_beta_g = total_sum_sigma / (x.shape[0] * self.T) resid_g = resid[g[gamma]] total_sum_omega = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): for s in range(self.T): diff_t = x_diff[i, t, np.newaxis] diff_s = x_diff[i, s, np.newaxis] total_sum_omega += resid_g[i, t] * resid_g[i, s] * (diff_t.T @ diff_s) omega_beta_g = total_sum_omega / (x.shape[0] * self.T) var_beta_g = ( np.linalg.inv(sigma_beta_g) @ omega_beta_g @ np.linalg.inv(sigma_beta_g) / (x.shape[0] * self.T) ) se_beta[gamma] = np.sqrt(np.diag(var_beta_g)) self._params_analytical_se["beta"] = se_beta def _get_analytical_standard_errors_ssp(self): """ Computes analytical standard errors for the Su and Shi Phillips model. """ if self._params is None: raise ValueError("Model has not been fitted yet") if self._model_type != "su_shi_phillips": raise ValueError("This function is only applicable for the Su and Shi Phillips model") resid = self._resid # type:ignore assert resid is not None, "Residuals must be computed before calculating standard errors" self._params_analytical_se = {} if self._entity_effects: se_alpha = np.atleast_2d(np.sqrt((resid**2).mean(axis=1))).T self._params_analytical_se["alpha"] = se_alpha g = self.params["g"] se_beta = np.zeros((self.G, self.K)) # type:ignore for gamma in self.params["g"].keys(): x = self.exog[g[gamma]] x_bar = self.exog[g[gamma]].mean(axis=0) x_diff = x - x_bar[np.newaxis, :, :] total_sum_sigma = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): total_sum_sigma += x_diff[i, t, np.newaxis].T @ x_diff[i, t, np.newaxis] sigma_beta_g = total_sum_sigma / (x.shape[0] * self.T) resid_g = resid[g[gamma]] total_sum_omega = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): for s in range(self.T): diff_t = x_diff[i, t, np.newaxis] diff_s = x_diff[i, s, np.newaxis] total_sum_omega += resid_g[i, t] * resid_g[i, s] * (diff_t.T @ diff_s) omega_beta_g = total_sum_omega / (x.shape[0] * self.T) var_beta_g = ( np.linalg.inv(sigma_beta_g) @ omega_beta_g @ np.linalg.inv(sigma_beta_g) / (x.shape[0] * self.T) ) se_beta[gamma] = np.sqrt(np.diag(var_beta_g)) self._params_analytical_se["beta"] = se_beta def _get_analytical_standard_errors(self): """ Computes analytical standard errors for the parameters. Note: this function will always be called, as the analytical standard errors are always computed Note: this function will only compute the analytical standard errors, it does not return anything Returns ------- None """ if self._disable_analytical_se: return elif self._model_type == "bonhomme_manresa": return self._get_analytical_standard_errors_bm() elif self._model_type == "su_shi_phillips": return self._get_analytical_standard_errors_ssp() raise NotImplementedError("Analytical standard errors function not implemented yet for this model")
[docs] class GroupedInteractiveFixedEffects(_GroupedPanelModelBase): """ GroupedInteractiveFixedEffects Class for estimating grouped interactive fixed effects in panel data. Implements the estimators by Ando and Bai (2016) and Su and Ju (2018). This class extends the grouped fixed effects framework by allowing for interactive effects (latent factors and loadings) that vary across groups. It is suitable for capturing unobserved heterogeneity that follows a low-rank factor structure within each group. It supports both heterogeneous and homogeneous slope coefficients (only for Ando & Bai, 2016). Example usage: >>> model = GroupedInteractiveFixedEffects(y, x, G=3) >>> model.fit() >>> model.summary() After fitting, the model provides access to estimated coefficients, group assignments, latent factors, and residual diagnostics. """ def __init__( self, dependent: ArrayLike, exog: ArrayLike, G: int, use_bootstrap: bool = False, model: Literal["ando_bai", "su_ju"] = "ando_bai", GF: ArrayLike | None = None, R: int | None = None, heterogeneous_beta: bool = True, **kwargs, ): """ Initialize the GroupedInteractiveFixedEffects model for panel data analysis. This model estimates grouped interactive fixed effects using either the Ando and Bai (2016) or the Su and Ju (2018) estimators. It allows for heterogeneity in slope coefficients across groups and supports group-specific factor structures to model unobserved interactive effects. Parameters ---------- dependent : ArrayLike A 3D array representing the dependent variable with shape (N, T, 1), where N is the number of individuals and T is the number of time periods. exog : ArrayLike A 3D array representing the exogenous variables with shape (N, T, K), where K is the number of regressors. G : int The number of latent groups to estimate. use_bootstrap : bool, optional Whether to compute bootstrap-based standard errors. Default is False. model : Literal["ando_bai", "su_ju"], optional The estimator to use. Choose between "ando_bai" (default) or "su_ju". GF : ArrayLike, optional An array specifying the number of latent factors for each group. If not provided, a single factor is assumed for each group. R : int, optional The total number of latent factors in the Su and Ju model. If not specified, defaults to G. heterogeneous_beta : bool, optional Whether to allow slope coefficients to vary across groups. Default is True. **kwargs : dict Additional configuration arguments, such as: - hide_progressbar (bool): Whether to suppress progress bars. - disable_analytical_se (bool): Whether to skip analytical SE calculation. """ super().__init__(dependent, exog, use_bootstrap, **kwargs) self._model_type = model if self._model_type not in ["ando_bai", "su_ju"]: raise ValueError("Model must be either 'ando_bai' or 'su_ju'") self.G = int(G) self.GF = ( GF if GF is not None else np.ones(G, dtype=int) ) # NOTE if GF is not defined, we assume all groups have one factor self.R = R if R is not None else self.G # Number of factors, default to G self.heterogeneous_beta = heterogeneous_beta # FIXME best to change this into multiple functions
[docs] def fit(self, **kwargs): """ Fit the GroupedInteractiveFixedEffects model to the data. This method estimates grouped interactive fixed effects using the selected estimator: - "ando_bai" (Ando & Bai, 2016): Supports both homogeneous and heterogeneous slope coefficients across groups. - "su_ju" (Su & Ju, 2018): Supports only heterogeneous slopes and models a shared factor structure across units. It estimates group assignments, coefficients, and latent interactive components, and computes information criteria and standard errors. Parameters ---------- n_boot : int, optional Number of bootstrap replications to compute standard errors. Default is 50. boot_n_jobs : int, optional Number of parallel jobs for the bootstrap procedure. Default is -1 (use all processors). kwargs : dict, optional Additional arguments passed to the underlying estimator routines. Returns ------- self : GroupedInteractiveFixedEffects The fitted model instance. Raises ------ ValueError If the selected model is not one of "ando_bai" or "su_ju", or if homogeneous beta is requested for "su_ju", which is not supported. """ self._pre_fit() n_boot = kwargs.pop("n_boot", 50) boot_n_jobs = kwargs.pop("boot_n_jobs", -1) # type:ignore assert self.GF is not None, "GF must be defined for the model" assert isinstance(self.GF, np.ndarray), "GF must be a numpy array" if self._model_type == "ando_bai": if self.heterogeneous_beta: # Use the heterogeneous version of the Ando and Bai model beta, g, F, Lambda, objective_value, resid = ando_bai_heterogeneous( self.dependent, self.exog, self.G, self.GF, **kwargs ) else: # Use the standard Ando and Bai model beta, g, F, Lambda, objective_value, resid = ando_bai( self.dependent, self.exog, self.G, self.GF, **kwargs ) # Create dictionary mapping group number to list of individuals g_members = {int(group): np.where(g == group)[0].tolist() for group in np.unique(g)} self._params = {"beta": beta.T, "g": g_members, "F": F.T, "Lambda": Lambda} num_params = self.G * self.T + self.GF.sum() + self.N + self.K self._resid = resid # Store the residuals self._IC = compute_statistics(self.N * self.T, num_params, resid, include_hqic=True) self._get_analytical_standard_errors() self._get_bootstrap_standard_errors( ("beta",), n_boot=n_boot, n_jobs=boot_n_jobs, **kwargs # type:ignore ) self._post_fit() # Set the fit duration and datetime return self elif self._model_type == "su_ju": if self.heterogeneous_beta == False: raise ValueError("Homogeneous beta is not supported for the Su and Ju model") b, beta, lambdas, factors, resid = su_ju( self.dependent, self.exog, self.N, self.T, self.K, self.G, R=self.R, **kwargs ) self._params = {"b": b, "beta": beta.T, "lambdas": lambdas, "factors": factors.T} # Get groupings _b = b.T _beta = beta.T dists = np.linalg.norm(_b[:, None, :] - _beta[None, :, :], axis=2) g = np.argmin(dists, axis=1) g_members = {int(group): np.where(g == group)[0].tolist() for group in np.unique(g)} self._params["g"] = g_members self._resid = resid num_params = np.unique_counts(np.concat([b.ravel(), beta.ravel(), lambdas.ravel()])).counts.sum() self._IC = compute_statistics(self.N * self.T, num_params, resid, include_hqic=True) self._get_analytical_standard_errors() self._get_bootstrap_standard_errors(("beta",), n_boot=n_boot, n_jobs=boot_n_jobs, **kwargs) self._post_fit() # Set the fit duration and datetime return self raise ValueError("Model must be either 'ando_bai' or 'su_ju'")
# FIXME this code does not actually compute the analytical standard errors for the Ando and Bai model # as it computes the Bonhomme and Manresa model standard errors def _get_analytical_standard_errors_ab(self): """ Computes analytical standard errors for the Bonhomme and Manresa model. """ if self._params is None: raise ValueError("Model has not been fitted yet") if self._model_type != "ando_bai": raise ValueError("This function is only applicable for the Bonhomme and Manresa model") se_beta = np.zeros((self.G, self.K)) # type:ignore g = self.params["g"] if self.heterogeneous_beta else {0: list(range(self.N))} resid = self._resid # type:ignore assert resid is not None, "Residuals must be computed before calculating standard errors" self._params_analytical_se = {} for gamma in g.keys(): assert isinstance(self.GF, np.ndarray), "Group members must be a list of indices" GF = np.array(self.GF, dtype=int) x = self.exog[g[gamma]] # F = self.params["F"] # F_g = F[:, self.GF[:gamma].sum() : self.GF[: gamma + 1].sum()].T # M_F = np.identity(self.T) - F_g @ np.linalg.pinv(F_g.T @ F_g) @ F_g.T # Lambda = self.params["Lambda"] # Lambda_g = Lambda[GF[:gamma].sum() : GF[: gamma + 1].sum(), :].T # N_g = len(g[gamma]) # z_init = M_F @ x # z_sub = np.zeros_like(z_init) # for i in range(N_g): # for j in range(N_g): # c = Lambda_g[i].T @ np.linalg.pinv(Lambda_g.T @ Lambda_g) @ Lambda_g[j] # z_sub[i] = c * z_init[j] # z = z_init - z_sub # x_diff = z x_bar = self.exog[g[gamma]].mean(axis=0) x_diff = x - x_bar[np.newaxis, :, :] total_sum_sigma = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): total_sum_sigma += x_diff[i, t, np.newaxis].T @ x_diff[i, t, np.newaxis] sigma_beta_g = total_sum_sigma / (x.shape[0] * self.T) resid_g = resid[g[gamma]] total_sum_omega = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): for s in range(self.T): diff_t = x_diff[i, t, np.newaxis] diff_s = x_diff[i, s, np.newaxis] total_sum_omega += resid_g[i, t] * resid_g[i, s] * (diff_t.T @ diff_s) omega_beta_g = total_sum_omega / (x.shape[0] * self.T) var_beta_g = ( np.linalg.inv(sigma_beta_g) @ omega_beta_g @ np.linalg.inv(sigma_beta_g) / (x.shape[0] * self.T) ) se_beta[gamma] = np.sqrt(np.diag(var_beta_g)) self._params_analytical_se["beta"] = se_beta def _get_analytical_standard_errors_sj(self): """ Computes analytical standard errors for the Su and Shi Phillips model. """ if self._params is None: raise ValueError("Model has not been fitted yet") if self._model_type != "su_ju": raise ValueError("This function is only applicable for the Su and Shi Phillips model") resid = self._resid # type:ignore assert resid is not None, "Residuals must be computed before calculating standard errors" self._params_analytical_se = {} g = self.params["g"] se_beta = np.zeros((self.G, self.K)) # type:ignore for gamma in self.params["g"].keys(): x = self.exog[g[gamma]] x_bar = self.exog[g[gamma]].mean(axis=0) x_diff = x - x_bar[np.newaxis, :, :] total_sum_sigma = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): total_sum_sigma += x_diff[i, t, np.newaxis].T @ x_diff[i, t, np.newaxis] sigma_beta_g = total_sum_sigma / (x.shape[0] * self.T) resid_g = resid[g[gamma]] total_sum_omega = np.zeros((self.K, self.K)) for i in range(x.shape[0]): for t in range(self.T): for s in range(self.T): diff_t = x_diff[i, t, np.newaxis] diff_s = x_diff[i, s, np.newaxis] total_sum_omega += resid_g[i, t] * resid_g[i, s] * (diff_t.T @ diff_s) omega_beta_g = total_sum_omega / (x.shape[0] * self.T) var_beta_g = ( np.linalg.inv(sigma_beta_g) @ omega_beta_g @ np.linalg.inv(sigma_beta_g) / (x.shape[0] * self.T) ) se_beta[gamma] = np.sqrt(np.diag(var_beta_g)) self._params_analytical_se["beta"] = se_beta def _get_analytical_standard_errors(self): """ Computes analytical standard errors for the parameters. Note: this function will always be called, as the analytical standard errors are always computed Returns ------- dict The analytical standard errors for the parameters """ if self._disable_analytical_se: return elif self._model_type == "ando_bai": return self._get_analytical_standard_errors_ab() elif self._model_type == "su_ju": return self._get_analytical_standard_errors_sj() raise NotImplementedError("Analytical standard errors function not implemented yet for this model")