LuluShi Blog
open main menu
Part of series:StatGenetic

LDSC

/ 12 min read

LDSC可以说是在统计遗传学中一个非常经典的算法了,一般用于估计summary data的遗传力以及评估数据的inflation, 当然,近年来,基于LDSC已经衍生出了非常多的工具,比如MEGMA, scDRS, scPagwas, gsMap等等。

假设

LDSC

LDSC系列梦最开始的地方

设 SNP jj 的 LD score 为 lj=krjk2l_j = \sum_k r_{jk}^2(对回归 SNP 集合求和), GWAS 的边际检验统计量 χj2=zj2\chi _j^2 = z_j^2。在多基因模型下

(U2)

E[xj2lj]=1+Nh2Mlj+Na(Main)E[x_j^2 | l_j] = 1 + \frac {N h^2} {M} l_j + Na \tag{Main}

其中 NN 是样本量,MM 是用于建模的 SNP 数(通常是回归用的一套 SNP,例如 HM3 snp), h2h^2是 SNP-based 遗传力(观测尺度), aa 吸收群体分层/隐性亲缘等导致的“与 LD score 无关”的均匀膨胀项。

把它写成线性回归形式:

(U3)

χj2=α+βlj+ϵj,α1+Na,βNh2M\chi_j^2 = \alpha + \beta l_j + \epsilon_j, \quad \alpha \approx 1 + Na, \quad \beta \approx \frac {Nh^2} {M}

所以:

(U4)

h^2=β^MN\hat h^2 = \hat \beta \frac {M} {N}

期望式的推导

从个体水平的线性模型开始(把每列基因型标准化到均值 0 方差 1)

(U1)

y=Xβ+ϵ,E[β]=0,Var(βk)=h2My = X\beta + \epsilon, \quad E[\beta] = 0, \quad Var(\beta_k) = \frac {h^2} {M}

对 SNP jj 做边际回归的估计量可写成(略去常数/标准化细节):

β^jkrjkβk+noise\hat \beta_j \approx \sum_k r_{jk}\beta_k + noise

平方取期望,利用 βk\beta_k 独立且方差相同 (Var(βk)=h2M)(Var(\beta_k) = \frac {h^2} {M}),得到遗传部分的贡献:

E[β^j2]krjk2Var(βk)=h2Mkrjk2=h2MljE[\hat \beta_j^2] \supset \sum_k r_{jk}^2 Var(\beta_k) = \frac {h^2} {M} \sum_k r_{jk}^2 = \frac {h^2} {M} l_j

把边际统计量换成 χ2=z2\chi^2 = z^2 并乘上样本量尺度,就得到 E[χ2l]E[\chi^2 | l]ll 线性增长。 再加上与 ll 无关的均匀偏倚项 NaNa,就得到 #(Main)

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple

# ------------------------------------------------------------
# Basic linear algebra: Weighted Least Squares
# ------------------------------------------------------------

def make_blocks(df: pd.DataFrame, n_blocks: int = 200,
                chr_col: str = "CHR", bp_col: str = "BP") -> np.ndarray:
    """
    Make ~contiguous blocks of adjacent SNPs.
    If CHR/BP exist -> sort by (CHR,BP). Else -> keep current order.
    """
    if chr_col in df.columns and bp_col in df.columns:
        order = np.lexsort((df[bp_col].values, df[chr_col].values))
    else:
        order = np.arange(len(df))
    inv = np.empty_like(order)
    inv[order] = np.arange(len(order))
    # split by equal counts
    block_id_sorted = np.floor(np.arange(len(df)) * n_blocks / len(df)).astype(int)
    block_id_sorted[block_id_sorted == n_blocks] = n_blocks - 1
    return block_id_sorted[inv]

# ------------------------------------------------------------
#  IRWLS for LDSC regression
# ------------------------------------------------------------
def ldsc_irwls_1d(
    y: np.ndarray, L: np.ndarray, Lw: Optional[np.ndarray] = None,
    n_iter: int = 2,  # ldsc source uses 2 update rounds  :contentReference[oaicite:18]{index=18}
    constrain_intercept: Optional[float] = None
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Fit: y = alpha + beta * L + eps    (U3/B3)

    Weights ~ 1 / ( Lw * Var(y|L) )
    - Correlation approx: weight by 1/ell_j(S)  :contentReference[oaicite:19]{index=19}
    - Heteroskedasticity: Var(chi2|L) ~ 2 * mu^2, mu = alpha + beta L
    """
    y = y.astype(float)
    L = L.astype(float)
    if Lw is None:
        Lw = L
    Lw = Lw.astype(float)

    # Design matrix
    if constrain_intercept is None:
        X = np.column_stack([np.ones_like(L), L])
        # start with equal weights
        w = np.ones_like(y)
        beta_hat = wls_fit(X, y, w)
        for _ in range(n_iter):
            mu = X @ beta_hat
            mu = np.clip(mu, 1e-6, None)  # keep positive
            # heterosk weight ~ 1 / (2*mu^2)
            w_hetero = 1.0 / (2.0 * mu * mu)
            # LD correlation approx weight ~ 1 / max(Lw,1)
            w_ld = 1.0 / np.maximum(Lw, 1.0)
            w = w_hetero * w_ld
            beta_hat = wls_fit(X, y, w)
        return beta_hat, w
    else:
        # Constrain intercept: y - c = beta*L + eps
        y2 = y - float(constrain_intercept)
        X = L.reshape(-1, 1)
        w = np.ones_like(y2)
        beta_hat = wls_fit(X, y2, w)
        for _ in range(n_iter):
            mu = float(constrain_intercept) + (X[:, 0] * beta_hat[0])
            mu = np.clip(mu, 1e-6, None)
            w_hetero = 1.0 / (2.0 * mu * mu)
            w_ld = 1.0 / np.maximum(Lw, 1.0)
            w = w_hetero * w_ld
            beta_hat = wls_fit(X, y2, w)
        # return [alpha,beta] for convenience
        return np.array([float(constrain_intercept), beta_hat[0]]), w

def jackknife_se(
    df: pd.DataFrame, y: np.ndarray, L: np.ndarray, Lw: Optional[np.ndarray],
    n_blocks: int = 200, n_iter: int = 2,
    constrain_intercept: Optional[float] = None
) -> np.ndarray:
    """
    Block jackknife SE for [alpha,beta].
    Var_jk = (B-1)/B * sum_b (theta_-b - mean(theta_-b))^2
    """
    blocks = make_blocks(df, n_blocks=n_blocks)
    B = blocks.max() + 1

    delete_vals = []
    for b in range(B):
        m = blocks != b
        bhat, _ = ldsc_irwls_1d(y[m], L[m], None if Lw is None else Lw[m],
                                n_iter=n_iter, constrain_intercept=constrain_intercept)
        delete_vals.append(bhat)
    delete_vals = np.vstack(delete_vals)
    mean_del = delete_vals.mean(axis=0)
    var = (B - 1) / B * np.sum((delete_vals - mean_del) ** 2, axis=0)
    return np.sqrt(var)

# Univariate LDSC: (U2)-(U5)
def ldsc_h2(
    sumstats: pd.DataFrame,
    ldscore: pd.DataFrame,
    snp_col: str = "SNP",
    z_col: str = "Z",
    n_col: str = "N",
    ld_col: str = "L2",
    wld_col: Optional[str] = None,  # optional separate weight LD
    M: Optional[float] = None,
    n_blocks: int = 200,
    n_iter: int = 2,
) -> Dict[str, float]:
    """
    Implements:
    (U2) E[chi2|L] = 1 + (N h2/M)*L + N a         :contentReference[oaicite:20]{index=20}
    (U3) chi2 = alpha + beta*L + eps
    (U4) h2 = beta * M / N
    (U5) Ratio = (alpha-1)/(mean(chi2)-1)        :contentReference[oaicite:21]{index=21}
    """
    df = sumstats[[snp_col, z_col, n_col]].merge(ldscore[[snp_col, ld_col] + ([wld_col] if wld_col else [])],
                                                 on=snp_col, how="inner").dropna()
    z = df[z_col].astype(float).values
    chi2 = z * z  # chi^2 = z^2
    L = df[ld_col].astype(float).values
    Lw = df[wld_col].astype(float).values if wld_col else L

    Nbar = df[n_col].astype(float).mean()
    if M is None:
        M = float(len(df))  # in practice, use .l2.M_5_50 from ldsc ref files
    # fit (U3)
    (alpha, beta), _ = ldsc_irwls_1d(chi2, L, Lw=Lw, n_iter=n_iter)

    h2 = beta * M / Nbar  # (U4)

    # jackknife SE for alpha,beta
    se_alpha, se_beta = jackknife_se(df, chi2, L, Lw, n_blocks=n_blocks, n_iter=n_iter)

    se_h2 = se_beta * M / Nbar
    ratio = (alpha - 1.0) / (chi2.mean() - 1.0) if chi2.mean() > 1 else np.nan

    return {
        "M": M,
        "N_mean": Nbar,
        "h2": float(h2),
        "h2_se": float(se_h2),
        "intercept": float(alpha),
        "intercept_se": float(se_alpha),
        "slope_beta": float(beta),
        "slope_se": float(se_beta),
        "mean_chi2": float(chi2.mean()),
        "ratio": float(ratio),
        "n_snps_used": int(len(df)),
    }

LDSC (rg)

LDSC的一个拓展,用于计算两个性状之间的correlation

把单表型的 χ2=z2\chi^2 = z^2 换成两个表型 z-score 的乘积:

(B1)

E[z1jz2j]=N1N2Mρgll+InterceptE[z_{1j}z_{2j}] = \frac {\sqrt{N_1 N_2}} {M} \rho_g l_l + Intercept

其中 ρg\rho_ggenetic covariance (很多地方把它写成 covariance,再由它除以两边 h2h^2 得到 rgr_g)。

(B2) 当有重叠样本

Intercept=ρNsN1N2Intercept = \frac {\rho N_s}{\sqrt{N_1N_2}}

NsN_s 是重叠人数,ρ\rho 是重叠样本中的表型相关。

于是回归:

(B3)

z1jz2j=α12+β12lj+ϵj,β12N1N2Mρgz_{1j}z_{2j} = \alpha_{12} + \beta_{12}l_j + \epsilon_j, \quad \beta_{12} \approx \frac {\sqrt{N_1N_2}} {M} \rho_g

(B4)

ρ^g=β^12MN1N2,ρ^g=ρgh^12h^22\hat \rho_g = \hat \beta_{12} \frac {M} {\sqrt{N_1N_2}}, \quad \hat \rho_g = \frac {\rho_g}{\sqrt{\hat h_1^2 \hat h_2^2}}

并且 sample overlap / shared stratification 主要影响截距, 不影响斜率(因此 rg 不被 sample overlap 偏倚是它的卖点之一)

Bivariate LDSC

def ldsc_rg(
    sumstats1: pd.DataFrame,
    sumstats2: pd.DataFrame,
    ldscore: pd.DataFrame,
    h2_1: float,
    h2_2: float,
    snp_col: str = "SNP",
    z1_col: str = "Z",
    z2_col: str = "Z",
    n1_col: str = "N",
    n2_col: str = "N",
    ld_col: str = "L2",
    wld_col: Optional[str] = None,
    M: Optional[float] = None,
    n_blocks: int = 200,
    n_iter: int = 2,
) -> Dict[str, float]:
    """
    (B1) E[z1*z2] = (sqrt(N1N2)/M)*rho_g * L + intercept   :contentReference[oaicite:22]{index=22}
    (B3) regression: z1z2 = a12 + b12 L
    (B4) rho_g = b12 * M / sqrt(N1N2);  rg = rho_g / sqrt(h2_1 h2_2)
    """
    df = sumstats1[[snp_col, z1_col, n1_col]].merge(
        sumstats2[[snp_col, z2_col, n2_col]], on=snp_col, how="inner", suffixes=("1", "2")
    ).merge(
        ldscore[[snp_col, ld_col] + ([wld_col] if wld_col else [])], on=snp_col, how="inner"
    ).dropna()

    z1 = df[f"{z1_col}1"].astype(float).values
    z2 = df[f"{z2_col}2"].astype(float).values
    y = z1 * z2  # dependent var in bivariate LDSC

    L = df[ld_col].astype(float).values
    Lw = df[wld_col].astype(float).values if wld_col else L

    N1 = df[f"{n1_col}1"].astype(float).mean()
    N2 = df[f"{n2_col}2"].astype(float).mean()
    if M is None:
        M = float(len(df))

    (a12, b12), _ = ldsc_irwls_1d(y, L, Lw=Lw, n_iter=n_iter)

    rho_g = b12 * M / np.sqrt(N1 * N2)
    rg = rho_g / np.sqrt(h2_1 * h2_2)

    se_a12, se_b12 = jackknife_se(df, y, L, Lw, n_blocks=n_blocks, n_iter=n_iter)
    se_rho_g = se_b12 * M / np.sqrt(N1 * N2)

    # simple delta for rg treating h2 fixed (for a full rg SE you'd also JK h2_1,h2_2)
    se_rg = se_rho_g / np.sqrt(h2_1 * h2_2)

    return {
        "M": M,
        "N1_mean": N1,
        "N2_mean": N2,
        "gencov_rho_g": float(rho_g),
        "gencov_se": float(se_rho_g),
        "rg": float(rg),
        "rg_se_approx": float(se_rg),
        "intercept": float(a12),
        "intercept_se": float(se_a12),
        "slope_b12": float(b12),
        "slope_se": float(se_b12),
        "n_snps_used": int(len(df)),
    }

S-LDSC

LDSC的另一个非常强大的拓展,S-LDSC 的思路是:允许不同注释类别对“每 SNP 的方差贡献”不同。

(S1) 效应方差模型

Var(βj)=cτcajcVar(\beta_j) = \sum_c \tau_c a_{jc}

其中 ajca_{jc} 是注释(0/1 或连续),τc\tau_c 是该注释的系数

定义“对类别 cc 的 LD score”:

(S2)

l(j,c)=krjk2akcl(j,c) = \sum_k r_{jk}^2 a_{kc}

则有

(S3)

E[χj2]=1+Ncτcl(j,c)+NaE[\chi_j^2] = 1 + N\sum_c \tau_c l(j,c) + Na

并据此对 χ2\chi^2 做多元回归估计 τc\tau_c。标准误用 block jackknife(常用 200 个相邻 SNP 块)。

import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple

# ------------------------------------------------------------
# Basic linear algebra: Weighted Least Squares
# ------------------------------------------------------------

def make_blocks(df: pd.DataFrame, n_blocks: int = 200,
                chr_col: str = "CHR", bp_col: str = "BP") -> np.ndarray:
    """
    Make ~contiguous blocks of adjacent SNPs.
    If CHR/BP exist -> sort by (CHR,BP). Else -> keep current order.
    """
    if chr_col in df.columns and bp_col in df.columns:
        order = np.lexsort((df[bp_col].values, df[chr_col].values))
    else:
        order = np.arange(len(df))
    inv = np.empty_like(order)
    inv[order] = np.arange(len(order))
    # split by equal counts
    block_id_sorted = np.floor(np.arange(len(df)) * n_blocks / len(df)).astype(int)
    block_id_sorted[block_id_sorted == n_blocks] = n_blocks - 1
    return block_id_sorted[inv]


def wls_fit(X: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray:
    """
    Solve beta = argmin sum_i w_i (y_i - X_i beta)^2
    """
    w = w.reshape(-1, 1)
    Xw = X * np.sqrt(w)
    yw = y.reshape(-1, 1) * np.sqrt(w)
    beta, *_ = np.linalg.lstsq(Xw, yw, rcond=None)
    return beta.reshape(-1)

def sldsc_tau(
    sumstats: pd.DataFrame,
    ldscore_multi: pd.DataFrame,
    ld_cols: List[str],
    snp_col: str = "SNP",
    z_col: str = "Z",
    n_col: str = "N",
    wld_col: Optional[str] = None,
    n_blocks: int = 200,
    n_iter: int = 2,
) -> Dict[str, np.ndarray]:
    """
    (S3) chi2 = alpha + N * sum_c tau_c * L(j,c) + eps
    In practice we regress chi2 on L(j,c) with intercept; tau relates to slope scale.
    Here we return regression coefficients on the chi2 scale (alpha + sum gamma_c L_c).
    Converting gamma->tau needs the N scaling and M bookkeeping, exactly as in S-LDSC. :contentReference[oaicite:23]{index=23}
    """
    cols = [snp_col, z_col, n_col]
    df = sumstats[cols].merge(ldscore_multi[[snp_col] + ld_cols + ([wld_col] if wld_col else [])],
                              on=snp_col, how="inner").dropna()
    z = df[z_col].astype(float).values
    chi2 = z * z
    Nbar = df[n_col].astype(float).mean()

    X = np.column_stack([np.ones(len(df))] + [df[c].astype(float).values for c in ld_cols])
    # use 1D weight LD score if provided, else use sum of L cols
    if wld_col:
        Lw = df[wld_col].astype(float).values
    else:
        Lw = np.sum(X[:, 1:], axis=1)

    # IRWLS weights using mu = Xb
    w = np.ones(len(df))
    b = wls_fit(X, chi2, w)
    for _ in range(n_iter):
        mu = X @ b
        mu = np.clip(mu, 1e-6, None)
        w_hetero = 1.0 / (2.0 * mu * mu)
        w_ld = 1.0 / np.maximum(Lw, 1.0)
        w = w_hetero * w_ld
        b = wls_fit(X, chi2, w)

    # jackknife SE
    blocks = make_blocks(df, n_blocks=n_blocks)
    B = blocks.max() + 1
    dels = []
    for bb in range(B):
        m = blocks != bb
        Xb = X[m, :]
        yb = chi2[m]
        Lwb = Lw[m]
        w0 = np.ones(len(yb))
        bh = wls_fit(Xb, yb, w0)
        for _ in range(n_iter):
            mu = Xb @ bh
            mu = np.clip(mu, 1e-6, None)
            w_hetero = 1.0 / (2.0 * mu * mu)
            w_ld = 1.0 / np.maximum(Lwb, 1.0)
            wj = w_hetero * w_ld
            bh = wls_fit(Xb, yb, wj)
        dels.append(bh)
    dels = np.vstack(dels)
    mean_del = dels.mean(axis=0)
    se = np.sqrt((B - 1) / B * np.sum((dels - mean_del) ** 2, axis=0))

    return {
        "N_mean": Nbar,
        "coef": b,          # [alpha, gamma_1, ..., gamma_C]
        "coef_se": se,
        "n_snps_used": len(df),
        "ld_cols": ld_cols,
    }