Source code for groupedpaneldatamodels.models.su_ju

import numpy as np
import scipy.optimize as opt

from numpy.linalg import lstsq, eigh, eigvalsh
from numba import njit
from sklearn.cluster import KMeans


@njit(fastmath=True)
def _base(y, x, beta, N, T, R):
    res = (y - np.sum(x * beta.T[:, None, :], axis=2)).T
    v_res = (res @ res.T) / N
    return eigvalsh(v_res)[:-R].sum() / T


@njit(fastmath=True)
def _norm(beta, alpha):
    alpha_norm2 = np.sum(alpha * alpha, axis=0)  # (n,)
    beta_norm2 = np.sum(beta * beta, axis=0)  # (m,)

    # all pairwise dot products
    dot = beta.T @ alpha  # (m, n)

    # expand to (m,1) and (1,n) for broadcasting
    d2 = beta_norm2.reshape((-1, 1)) + alpha_norm2.reshape((1, -1)) - 2.0 * dot

    # clamp tiny negatives from round-off
    d2 = np.maximum(d2, 0.0)

    return np.sqrt(d2)


@njit(fastmath=True)
def _row_prod(a):
    """Row-wise product for 2-D array `a` – returns shape (a.shape[0],)."""
    m, n = a.shape
    out = np.empty(m, dtype=a.dtype)
    for i in range(m):
        p = 1.0
        for j in range(n):
            p *= a[i, j]
        out[i] = p
    return out


@njit(fastmath=True)
def _objective_value_without_individual_effects(y, x, beta, alpha, kappa, N, R, T):
    base = _base(y, x, beta, N, T, R)
    penalty = np.mean(_row_prod(_norm(beta, alpha))) * kappa
    return base + penalty


def _generate_initial_estimates(y, x, N, T, K, G):
    beta_init = np.zeros((K, N))

    for i in range(N):
        beta_init[:, i : i + 1] = lstsq(x[i].reshape(T, K), y[i].reshape(T, 1))[0]
    alpha_init = KMeans(n_clusters=G).fit(beta_init.T).cluster_centers_.T

    for j in range(G):
        if np.abs(beta_init.T - alpha_init[:, j]).min() < 1e-2:
            alpha_init[:, j] += 1e-1 * np.sign(alpha_init[:, j])

    mu_init = np.mean(y, axis=1)

    return beta_init, alpha_init, mu_init


def _order_alpha(alpha):
    """Reorders the groups based on the first value of alpha"""
    # FIXME this is not the best way to do this
    # But it works for now
    mapping = np.argsort(alpha[0])
    ordered_alpha = alpha[:, mapping]
    return ordered_alpha


def _compute_resid(y, x, beta, alpha, factors, lambdas, N, T):
    diff = beta[:, :, None] - alpha[:, None, :]
    dist2 = np.sum(diff**2, axis=0)
    nearest = np.argmin(dist2, axis=1)
    beta_rounded = alpha[:, nearest]

    return y - np.sum(x * beta_rounded.T[:, None, :], axis=2) - (factors @ lambdas).T


[docs] def interactive_effects_estimation( y: np.ndarray, x: np.ndarray, N: int, T: int, K: int, G: int, R: int, max_iter: int = 1000, only_bfgs: bool = True, tol: float = 1e-6, kappa: float = 0.1, ): """Internal function to estimate the interactive effects model as described by Su and Ju (2018). Args: y (np.ndarray): Dependent variable, shape (N, T, 1). x (np.ndarray): Explanatory variables, shape (N, T, K). N (int): Number of individuals (cross-sectional units). T (int): Number of time periods. K (int): Number of explanatory variables. G (int): Number of groups. R (int): Number of common factors (global). max_iter (int, optional): Maximum number of acceptable iterations, may be too large. Defaults to 1000. only_bfgs (bool, optional): Only uses BFGS. Defaults to True. tol (float, optional): Acceptable tolerance for the stopping condition. Defaults to 1e-6. kappa (float, optional): Kappa penalty parameter. Defaults to 0.1. Returns: tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: - beta: Estimated coefficients for each individual, shape (K, N) - alpha: Ordered group-level representative coefficients, shape (K, G) - lambdas: Factor loadings, shape (R, N) - factors: Common factors, shape (T, R) - resid: Residuals of the model, shape (N, T) """ y = np.squeeze(y, axis=2) beta, alpha, _ = _generate_initial_estimates(y, x, N, T, K, G) alpha_prev = alpha.copy() obj_value = np.inf last_obj_value = np.inf for i in range(max_iter): for j in range(G): alpha_fixed = alpha.copy() def unpack_local(theta): beta = theta[: K * N].reshape(K, N) alpha = alpha_fixed.copy() alpha[:, j : j + 1] = theta[K * N :].reshape(K, 1) return beta, alpha def obj_local(theta): beta, alpha = unpack_local(theta) return _objective_value_without_individual_effects(y, x, beta, alpha, kappa, N, R, T) def pack_local(beta, alpha): return np.concatenate((beta.flatten(), alpha[:, j].flatten()), axis=0) if i % 2 == 0 or only_bfgs: minimizer = opt.minimize( obj_local, pack_local(beta, alpha), method="L-BFGS-B", options={"maxiter": 100 if only_bfgs else 10}, tol=1e-4, ) beta, alpha = unpack_local(minimizer.x) obj_value = minimizer.fun # print(f"BFGS Iteration {i}, Group {j}, Objective Value: {obj_value:.6f}") else: minimizer = opt.minimize( obj_local, pack_local(beta, alpha), method="Nelder-Mead", options={"adaptive": True, "maxiter": 100}, tol=1e-6, ) beta, alpha = unpack_local(minimizer.x) obj_value = minimizer.fun # print(f"Nelder-Mead Iteration {i}, Group {j}, Objective Value: {obj_value:.6f}") if ( np.abs(obj_value - last_obj_value) < tol and np.linalg.norm(alpha - alpha_prev) / np.linalg.norm(alpha_prev) < tol ): # print("Convergence reached.") break last_obj_value = obj_value alpha_prev = alpha.copy() res = (np.squeeze(y) - np.sum(x * beta.T[:, None, :], axis=2)).T res_var = (res @ res.T) / N factors = eigh(res_var).eigenvectors[:, -R:] factors = factors[:, ::-1] # Reverse to have descending order lambdas = np.zeros((R, N), dtype=np.float32) for i in range(R): lambdas[i, :] = factors[:, i].T @ res resid = _compute_resid(y, x, beta, alpha, factors, lambdas, N, T) return beta, _order_alpha(alpha), lambdas, factors, resid