LDSC
LDSC可以说是在统计遗传学中一个非常经典的算法了,一般用于估计summary data的遗传力以及评估数据的inflation,
当然,近年来,基于LDSC已经衍生出了非常多的工具,比如MEGMA, scDRS, scPagwas, gsMap等等。
假设
LDSC
LDSC系列梦最开始的地方
设 SNP 的 LD score 为 (对回归 SNP 集合求和), GWAS 的边际检验统计量 。在多基因模型下
(U2)
其中 是样本量, 是用于建模的 SNP 数(通常是回归用的一套 SNP,例如 HM3 snp), 是 SNP-based 遗传力(观测尺度), 吸收群体分层/隐性亲缘等导致的“与 LD score 无关”的均匀膨胀项。
把它写成线性回归形式:
(U3)
所以:
(U4)
期望式的推导
从个体水平的线性模型开始(把每列基因型标准化到均值 0 方差 1)
(U1)
对 SNP 做边际回归的估计量可写成(略去常数/标准化细节):
平方取期望,利用 独立且方差相同 ,得到遗传部分的贡献:
把边际统计量换成 并乘上样本量尺度,就得到 随 线性增长。
再加上与 无关的均匀偏倚项 ,就得到 #(Main)。
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple
# ------------------------------------------------------------
# Basic linear algebra: Weighted Least Squares
# ------------------------------------------------------------
def make_blocks(df: pd.DataFrame, n_blocks: int = 200,
chr_col: str = "CHR", bp_col: str = "BP") -> np.ndarray:
"""
Make ~contiguous blocks of adjacent SNPs.
If CHR/BP exist -> sort by (CHR,BP). Else -> keep current order.
"""
if chr_col in df.columns and bp_col in df.columns:
order = np.lexsort((df[bp_col].values, df[chr_col].values))
else:
order = np.arange(len(df))
inv = np.empty_like(order)
inv[order] = np.arange(len(order))
# split by equal counts
block_id_sorted = np.floor(np.arange(len(df)) * n_blocks / len(df)).astype(int)
block_id_sorted[block_id_sorted == n_blocks] = n_blocks - 1
return block_id_sorted[inv]
# ------------------------------------------------------------
# IRWLS for LDSC regression
# ------------------------------------------------------------
def ldsc_irwls_1d(
y: np.ndarray, L: np.ndarray, Lw: Optional[np.ndarray] = None,
n_iter: int = 2, # ldsc source uses 2 update rounds :contentReference[oaicite:18]{index=18}
constrain_intercept: Optional[float] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""
Fit: y = alpha + beta * L + eps (U3/B3)
Weights ~ 1 / ( Lw * Var(y|L) )
- Correlation approx: weight by 1/ell_j(S) :contentReference[oaicite:19]{index=19}
- Heteroskedasticity: Var(chi2|L) ~ 2 * mu^2, mu = alpha + beta L
"""
y = y.astype(float)
L = L.astype(float)
if Lw is None:
Lw = L
Lw = Lw.astype(float)
# Design matrix
if constrain_intercept is None:
X = np.column_stack([np.ones_like(L), L])
# start with equal weights
w = np.ones_like(y)
beta_hat = wls_fit(X, y, w)
for _ in range(n_iter):
mu = X @ beta_hat
mu = np.clip(mu, 1e-6, None) # keep positive
# heterosk weight ~ 1 / (2*mu^2)
w_hetero = 1.0 / (2.0 * mu * mu)
# LD correlation approx weight ~ 1 / max(Lw,1)
w_ld = 1.0 / np.maximum(Lw, 1.0)
w = w_hetero * w_ld
beta_hat = wls_fit(X, y, w)
return beta_hat, w
else:
# Constrain intercept: y - c = beta*L + eps
y2 = y - float(constrain_intercept)
X = L.reshape(-1, 1)
w = np.ones_like(y2)
beta_hat = wls_fit(X, y2, w)
for _ in range(n_iter):
mu = float(constrain_intercept) + (X[:, 0] * beta_hat[0])
mu = np.clip(mu, 1e-6, None)
w_hetero = 1.0 / (2.0 * mu * mu)
w_ld = 1.0 / np.maximum(Lw, 1.0)
w = w_hetero * w_ld
beta_hat = wls_fit(X, y2, w)
# return [alpha,beta] for convenience
return np.array([float(constrain_intercept), beta_hat[0]]), w
def jackknife_se(
df: pd.DataFrame, y: np.ndarray, L: np.ndarray, Lw: Optional[np.ndarray],
n_blocks: int = 200, n_iter: int = 2,
constrain_intercept: Optional[float] = None
) -> np.ndarray:
"""
Block jackknife SE for [alpha,beta].
Var_jk = (B-1)/B * sum_b (theta_-b - mean(theta_-b))^2
"""
blocks = make_blocks(df, n_blocks=n_blocks)
B = blocks.max() + 1
delete_vals = []
for b in range(B):
m = blocks != b
bhat, _ = ldsc_irwls_1d(y[m], L[m], None if Lw is None else Lw[m],
n_iter=n_iter, constrain_intercept=constrain_intercept)
delete_vals.append(bhat)
delete_vals = np.vstack(delete_vals)
mean_del = delete_vals.mean(axis=0)
var = (B - 1) / B * np.sum((delete_vals - mean_del) ** 2, axis=0)
return np.sqrt(var)
# Univariate LDSC: (U2)-(U5)
def ldsc_h2(
sumstats: pd.DataFrame,
ldscore: pd.DataFrame,
snp_col: str = "SNP",
z_col: str = "Z",
n_col: str = "N",
ld_col: str = "L2",
wld_col: Optional[str] = None, # optional separate weight LD
M: Optional[float] = None,
n_blocks: int = 200,
n_iter: int = 2,
) -> Dict[str, float]:
"""
Implements:
(U2) E[chi2|L] = 1 + (N h2/M)*L + N a :contentReference[oaicite:20]{index=20}
(U3) chi2 = alpha + beta*L + eps
(U4) h2 = beta * M / N
(U5) Ratio = (alpha-1)/(mean(chi2)-1) :contentReference[oaicite:21]{index=21}
"""
df = sumstats[[snp_col, z_col, n_col]].merge(ldscore[[snp_col, ld_col] + ([wld_col] if wld_col else [])],
on=snp_col, how="inner").dropna()
z = df[z_col].astype(float).values
chi2 = z * z # chi^2 = z^2
L = df[ld_col].astype(float).values
Lw = df[wld_col].astype(float).values if wld_col else L
Nbar = df[n_col].astype(float).mean()
if M is None:
M = float(len(df)) # in practice, use .l2.M_5_50 from ldsc ref files
# fit (U3)
(alpha, beta), _ = ldsc_irwls_1d(chi2, L, Lw=Lw, n_iter=n_iter)
h2 = beta * M / Nbar # (U4)
# jackknife SE for alpha,beta
se_alpha, se_beta = jackknife_se(df, chi2, L, Lw, n_blocks=n_blocks, n_iter=n_iter)
se_h2 = se_beta * M / Nbar
ratio = (alpha - 1.0) / (chi2.mean() - 1.0) if chi2.mean() > 1 else np.nan
return {
"M": M,
"N_mean": Nbar,
"h2": float(h2),
"h2_se": float(se_h2),
"intercept": float(alpha),
"intercept_se": float(se_alpha),
"slope_beta": float(beta),
"slope_se": float(se_beta),
"mean_chi2": float(chi2.mean()),
"ratio": float(ratio),
"n_snps_used": int(len(df)),
}
LDSC (rg)
LDSC的一个拓展,用于计算两个性状之间的correlation
把单表型的 换成两个表型 z-score 的乘积:
(B1)
其中 是 genetic covariance (很多地方把它写成 covariance,再由它除以两边 得到 )。
(B2) 当有重叠样本时
是重叠人数, 是重叠样本中的表型相关。
于是回归:
(B3)
(B4)
并且 sample overlap / shared stratification 主要影响截距, 不影响斜率(因此 rg 不被 sample overlap 偏倚是它的卖点之一)
Bivariate LDSC
def ldsc_rg(
sumstats1: pd.DataFrame,
sumstats2: pd.DataFrame,
ldscore: pd.DataFrame,
h2_1: float,
h2_2: float,
snp_col: str = "SNP",
z1_col: str = "Z",
z2_col: str = "Z",
n1_col: str = "N",
n2_col: str = "N",
ld_col: str = "L2",
wld_col: Optional[str] = None,
M: Optional[float] = None,
n_blocks: int = 200,
n_iter: int = 2,
) -> Dict[str, float]:
"""
(B1) E[z1*z2] = (sqrt(N1N2)/M)*rho_g * L + intercept :contentReference[oaicite:22]{index=22}
(B3) regression: z1z2 = a12 + b12 L
(B4) rho_g = b12 * M / sqrt(N1N2); rg = rho_g / sqrt(h2_1 h2_2)
"""
df = sumstats1[[snp_col, z1_col, n1_col]].merge(
sumstats2[[snp_col, z2_col, n2_col]], on=snp_col, how="inner", suffixes=("1", "2")
).merge(
ldscore[[snp_col, ld_col] + ([wld_col] if wld_col else [])], on=snp_col, how="inner"
).dropna()
z1 = df[f"{z1_col}1"].astype(float).values
z2 = df[f"{z2_col}2"].astype(float).values
y = z1 * z2 # dependent var in bivariate LDSC
L = df[ld_col].astype(float).values
Lw = df[wld_col].astype(float).values if wld_col else L
N1 = df[f"{n1_col}1"].astype(float).mean()
N2 = df[f"{n2_col}2"].astype(float).mean()
if M is None:
M = float(len(df))
(a12, b12), _ = ldsc_irwls_1d(y, L, Lw=Lw, n_iter=n_iter)
rho_g = b12 * M / np.sqrt(N1 * N2)
rg = rho_g / np.sqrt(h2_1 * h2_2)
se_a12, se_b12 = jackknife_se(df, y, L, Lw, n_blocks=n_blocks, n_iter=n_iter)
se_rho_g = se_b12 * M / np.sqrt(N1 * N2)
# simple delta for rg treating h2 fixed (for a full rg SE you'd also JK h2_1,h2_2)
se_rg = se_rho_g / np.sqrt(h2_1 * h2_2)
return {
"M": M,
"N1_mean": N1,
"N2_mean": N2,
"gencov_rho_g": float(rho_g),
"gencov_se": float(se_rho_g),
"rg": float(rg),
"rg_se_approx": float(se_rg),
"intercept": float(a12),
"intercept_se": float(se_a12),
"slope_b12": float(b12),
"slope_se": float(se_b12),
"n_snps_used": int(len(df)),
}
S-LDSC
LDSC的另一个非常强大的拓展,S-LDSC 的思路是:允许不同注释类别对“每 SNP 的方差贡献”不同。
(S1) 效应方差模型
其中 是注释(0/1 或连续), 是该注释的系数
定义“对类别 的 LD score”:
(S2)
则有
(S3)
并据此对 做多元回归估计 。标准误用 block jackknife(常用 200 个相邻 SNP 块)。
import numpy as np
import pandas as pd
from scipy import stats
from typing import Dict, List, Optional, Tuple
# ------------------------------------------------------------
# Basic linear algebra: Weighted Least Squares
# ------------------------------------------------------------
def make_blocks(df: pd.DataFrame, n_blocks: int = 200,
chr_col: str = "CHR", bp_col: str = "BP") -> np.ndarray:
"""
Make ~contiguous blocks of adjacent SNPs.
If CHR/BP exist -> sort by (CHR,BP). Else -> keep current order.
"""
if chr_col in df.columns and bp_col in df.columns:
order = np.lexsort((df[bp_col].values, df[chr_col].values))
else:
order = np.arange(len(df))
inv = np.empty_like(order)
inv[order] = np.arange(len(order))
# split by equal counts
block_id_sorted = np.floor(np.arange(len(df)) * n_blocks / len(df)).astype(int)
block_id_sorted[block_id_sorted == n_blocks] = n_blocks - 1
return block_id_sorted[inv]
def wls_fit(X: np.ndarray, y: np.ndarray, w: np.ndarray) -> np.ndarray:
"""
Solve beta = argmin sum_i w_i (y_i - X_i beta)^2
"""
w = w.reshape(-1, 1)
Xw = X * np.sqrt(w)
yw = y.reshape(-1, 1) * np.sqrt(w)
beta, *_ = np.linalg.lstsq(Xw, yw, rcond=None)
return beta.reshape(-1)
def sldsc_tau(
sumstats: pd.DataFrame,
ldscore_multi: pd.DataFrame,
ld_cols: List[str],
snp_col: str = "SNP",
z_col: str = "Z",
n_col: str = "N",
wld_col: Optional[str] = None,
n_blocks: int = 200,
n_iter: int = 2,
) -> Dict[str, np.ndarray]:
"""
(S3) chi2 = alpha + N * sum_c tau_c * L(j,c) + eps
In practice we regress chi2 on L(j,c) with intercept; tau relates to slope scale.
Here we return regression coefficients on the chi2 scale (alpha + sum gamma_c L_c).
Converting gamma->tau needs the N scaling and M bookkeeping, exactly as in S-LDSC. :contentReference[oaicite:23]{index=23}
"""
cols = [snp_col, z_col, n_col]
df = sumstats[cols].merge(ldscore_multi[[snp_col] + ld_cols + ([wld_col] if wld_col else [])],
on=snp_col, how="inner").dropna()
z = df[z_col].astype(float).values
chi2 = z * z
Nbar = df[n_col].astype(float).mean()
X = np.column_stack([np.ones(len(df))] + [df[c].astype(float).values for c in ld_cols])
# use 1D weight LD score if provided, else use sum of L cols
if wld_col:
Lw = df[wld_col].astype(float).values
else:
Lw = np.sum(X[:, 1:], axis=1)
# IRWLS weights using mu = Xb
w = np.ones(len(df))
b = wls_fit(X, chi2, w)
for _ in range(n_iter):
mu = X @ b
mu = np.clip(mu, 1e-6, None)
w_hetero = 1.0 / (2.0 * mu * mu)
w_ld = 1.0 / np.maximum(Lw, 1.0)
w = w_hetero * w_ld
b = wls_fit(X, chi2, w)
# jackknife SE
blocks = make_blocks(df, n_blocks=n_blocks)
B = blocks.max() + 1
dels = []
for bb in range(B):
m = blocks != bb
Xb = X[m, :]
yb = chi2[m]
Lwb = Lw[m]
w0 = np.ones(len(yb))
bh = wls_fit(Xb, yb, w0)
for _ in range(n_iter):
mu = Xb @ bh
mu = np.clip(mu, 1e-6, None)
w_hetero = 1.0 / (2.0 * mu * mu)
w_ld = 1.0 / np.maximum(Lwb, 1.0)
wj = w_hetero * w_ld
bh = wls_fit(Xb, yb, wj)
dels.append(bh)
dels = np.vstack(dels)
mean_del = dels.mean(axis=0)
se = np.sqrt((B - 1) / B * np.sum((dels - mean_del) ** 2, axis=0))
return {
"N_mean": Nbar,
"coef": b, # [alpha, gamma_1, ..., gamma_C]
"coef_se": se,
"n_snps_used": len(df),
"ld_cols": ld_cols,
}