python 实现 HMM算法
个人学习记录备份,内容只对本人有效,不保证正确性
# -*-coding:UTF-8-*-
import numpy as np
class HMM:
def __init__(self, Ann, Bnm, pi1n):
# 状态转移概率矩阵
self.A = np.array(Ann)
# 观测概率矩阵
self.B = np.array(Bnm)
# 初始概率分布
self.pi = np.array(pi1n)
# 状态的数量
self.N = self.A.shape[0]
# 观测结果的数量
self.M = self.B.shape[1]
def printhmm(self):
print("==================================================")
print("HMM content: N =", self.N, ",M =", self.M)
for i in range(self.N):
if i == 0:
print("hmm.A ", self.A[i, :], " hmm.B ", self.B[i, :])
else:
print(" ", self.A[i, :], " ", self.B[i, :])
print("hmm.pi", self.pi)
print("==================================================")
# 函数名称:Forward *功能:前向算法估计参数 *参数:phmm:指向HMM的指针
# T:观察值序列的长度 O:观察值序列
# alpha:运算中用到的临时数组 pprob:返回值,所要求的概率
def Forward(self, T, O, alpha, pprob):
# 1. Initialization 初始化
for i in range(self.N):
alpha[0, i] = self.pi[i] * self.B[i, O[0]]
# 2. Induction 递归
for t in range(T - 1):
for j in range(self.N):
sum = 0.0
for i in range(self.N):
sum += alpha[t, i] * self.A[i, j]
alpha[t + 1, j] = sum * self.B[j, O[t + 1]]
# 3. Termination 终止
sum = 0.0
for i in range(self.N):
sum += alpha[T - 1, i]
pprob[0] *= sum
# 带修正的前向算法
def ForwardWithScale(self, T, O, alpha, scale, pprob):
scale[0] = 0.0
# 1. Initialization
for i in range(self.N):
alpha[0, i] = self.pi[i] * self.B[i, O[0]]
scale[0] += alpha[0, i]
for i in range(self.N):
alpha[0, i] /= scale[0]
# 2. Induction
for t in range(T - 1):
scale[t + 1] = 0.0
for j in range(self.N):
sum = 0.0
for i in range(self.N):
sum += alpha[t, i] * self.A[i, j]
alpha[t + 1, j] = sum * self.B[j, O[t + 1]]
scale[t + 1] += alpha[t + 1, j]
for j in range(self.N):
alpha[t + 1, j] /= scale[t + 1]
# 3. Termination
for t in range(T):
pprob[0] += np.log(scale[t])
# 函数名称:Backward * 功能:后向算法估计参数 * 参数:phmm:指向HMM的指针
# T:观察值序列的长度 O:观察值序列
# beta:运算中用到的临时数组 pprob:返回值,所要求的概率
def Backword(self, T, O, beta, pprob):
# 1. Intialization
for i in range(self.N):
beta[T - 1, i] = 1.0
# 2. Induction
for t in range(T - 2, -1, -1):
for i in range(self.N):
sum = 0.0
for j in range(self.N):
sum += self.A[i, j] * self.B[j, O[t + 1]] * beta[t + 1, j]
beta[t, i] = sum
# 3. Termination
pprob[0] = 0.0
for i in range(self.N):
pprob[0] += self.pi[i] * self.B[i, O[0]] * beta[0, i]
# 带修正的后向算法
def BackwardWithScale(self, T, O, beta, scale):
# 1. Intialization
for i in range(self.N):
beta[T - 1, i] = 1.0
# 2. Induction
for t in range(T - 2, -1, -1):
for i in range(self.N):
sum = 0.0
for j in range(self.N):
sum += self.A[i, j] * self.B[j, O[t + 1]] * beta[t + 1, j]
beta[t, i] = sum / scale[t + 1]
# Viterbi算法
# 输入:A,B,pi,O 输出P(O|lambda)最大时Poptimal的路径I
def viterbi(self, O):
T = len(O)
# 初始化
delta = np.zeros((T, self.N), np.float)
phi = np.zeros((T, self.N), np.float)
I = np.zeros(T)
for i in range(self.N):
delta[0, i] = self.pi[i] * self.B[i, O[0]]
phi[0, i] = 0
# 递推
for t in range(1, T):
for i in range(self.N):
delta[t, i] = (
self.B[i, O[t]]
* np.array(
[delta[t - 1, j] * self.A[j, i] for j in range(self.N)]
).max()
)
phi[t, i] = np.array(
[delta[t - 1, j] * self.A[j, i] for j in range(self.N)]
).argmax()
# 终结
prob = delta[T - 1, :].max()
I[T - 1] = delta[T - 1, :].argmax()
# 状态序列求取
for t in range(T - 2, -1, -1):
I[t] = phi[t + 1, I[t + 1]]
return I, prob
# 计算gamma : 时刻t时马尔可夫链处于状态Si的概率
def ComputeGamma(self, T, alpha, beta, gamma):
for t in range(T):
denominator = 0.0
for j in range(self.N):
gamma[t, j] = alpha[t, j] * beta[t, j]
denominator += gamma[t, j]
for i in range(self.N):
gamma[t, i] = gamma[t, i] / denominator
# 计算sai(i,j) 为给定训练序列O和模型lambda时:
# 时刻t是马尔可夫链处于Si状态,二时刻t+1处于Sj状态的概率
def ComputeXi(self, T, O, alpha, beta, gamma, xi):
for t in range(T - 1):
sum = 0.0
for i in range(self.N):
for j in range(self.N):
xi[t, i, j] = (
alpha[t, i]
* beta[t + 1, j]
* self.A[i, j]
* self.B[j, O[t + 1]]
)
sum += xi[t, i, j]
for i in range(self.N):
for j in range(self.N):
xi[t, i, j] /= sum
# Baum-Welch算法
# 初始模型:HMM={A,B,pi,N,M}
# 输入 L个观测序列O
def BaumWelch(self, L, T, O, alpha, beta, gamma):
print("BaumWelch")
DELTA = 0.01
round = 0
flag = 1
probf = [0.0]
delta = 0.0
deltaprev = 0.0
probprev = 0.0
ratio = 0.0
deltaprev = 10e-70
xi = np.zeros((T, self.N, self.N))
pi = np.zeros((T), np.float)
denominatorA = np.zeros((self.N), np.float)
denominatorB = np.zeros((self.N), np.float)
numeratorA = np.zeros((self.N, self.N), np.float)
numeratorB = np.zeros((self.N, self.M), np.float)
scale = np.zeros((T), np.float)
while True:
probf[0] = 0
# E - step
for l in range(L):
self.ForwardWithScale(T, O[l], alpha, scale, probf)
self.BackwardWithScale(T, O[l], beta, scale)
self.ComputeGamma(T, alpha, beta, gamma)
self.ComputeXi(T, O[l], alpha, beta, gamma, xi)
for i in range(self.N):
pi[i] += gamma[0, i]
for t in range(T - 1):
denominatorA[i] += gamma[t, i]
denominatorB[i] += gamma[t, i]
denominatorB[i] += gamma[T - 1, i]
for j in range(self.N):
for t in range(T - 1):
numeratorA[i, j] += xi[t, i, j]
for k in range(self.M):
for t in range(T):
if O[l][t] == k:
numeratorB[i, k] += gamma[t, i]
# M - step
# 重估状态转移矩阵 和 观察概率矩阵
for i in range(self.N):
self.pi[i] = 0.001 / self.N + 0.999 * pi[i] / L
for j in range(self.N):
self.A[i, j] = (
0.001 / self.N + 0.999 * numeratorA[i, j] / denominatorA[i]
)
numeratorA[i, j] = 0.0
for k in range(self.M):
self.B[i, k] = (
0.001 / self.M + 0.999 * numeratorB[i, k] / denominatorB[i]
)
numeratorB[i, k] = 0.0
pi[i] = denominatorA[i] = denominatorB[i] = 0.0
if flag == 1:
flag = 0
probprev = probf[0]
ratio = 1
continue
delta = probf[0] - probprev
ratio = delta / deltaprev
probprev = probf[0]
deltaprev = delta
round += 1
if ratio <= DELTA:
print("num iteration ", round)
break
if __name__ == "__main__":
print("python my HMM")
# 例子: 两枚硬币抛正反面,状态
# 状态集合Q: [q0, q1], q0: 0号硬币, q1: 1号硬币
# 状态转移概率矩阵A: [[q0 -> q0, q0 -> q1], [q1 -> q0, q1 -> q1]]
A = [[0.75, 0.25], [0.25, 0.75]]
# 观测集合V: [v0, v1], v0: 正面, v1: 反面
# 观测概率矩阵B: [[v0|q0, v1|q0], [v0|q1, v1|q1]]
B = [[0.51, 0.49], [0.48, 0.52]]
# 初始概率分布pai: 表示时刻t=1处于状态qi的概率。
pai = [0.5, 0.5]
# 初始化 hmm
hmm = HMM(A, B, pai)
# 状态序列I 未知
# 观测序列O 已知
O = [
[1, 0, 0, 1, 1, 0, 0, 0, 0],
[1, 1, 0, 1, 0, 0, 1, 1, 0],
[0, 0, 1, 1, 0, 0, 1, 1, 1],
]
# 观测次数
L = len(O)
# 观测序列的长度
T = len(O[0])
# 状态的个数
lA = len(A)
# 观测结果的个数
alpha = np.zeros((T, lA), np.float)
beta = np.zeros((T, lA), np.float)
gamma = np.zeros((T, lA), np.float)
hmm.BaumWelch(L, T, O, alpha, beta, gamma)
hmm.printhmm()
# 概率计算问题
# 给定模型M=(A, B, pai)和观测序列O=(o1, o2, ..., oT),计算模型M下观测序列O出现的概率P(O|M)
# TODO
# 状态转移概率矩阵A
A = [
[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5],
]
# 观测概率矩阵B
B = [
[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3],
]
# 状态集合Q
Q = [0, 1, 2]
# 观测集合V
V = [0, 1]
# 初始概率分布pai: 只描述状态,表示时刻t=1处于状态qi的概率
pai = [0.2, 0.4, 0.4]
# 状态的个数
lA = len(A)
# 观测序列长度
T = 3
# 观测序列
O = [0, 1, 0]
# 前向算法计算
# 给定隐马尔可夫模型λ,定义到时刻t部分观测序列为o1, o2, ..., ot,且状态为qi的概率为前向概率,记做alpha(t, i)
# 首次可能观测到 o1 的概率分布
alpha = [pai[i] * B[i][O[0]] for i in range(lA)]
# 后续可能观测到 0t 的概率分布
for t in range(1, T):
alpha = [sum([alpha[j]*A[j][i] for j in range(lA)])*B[i][O[t]] for i in range(lA)]
P_O_M = sum([alpha[i] for i in range(lA)])
print(P_O_M)
# 后向算法计算
# 给定隐马尔可夫模型λ,定义在时刻t状态为qi的条件下,从t+1到T的部分观测序列为ot+1, ot+2, ..., oT的概率为后向概率,记做beta(t, i)
beta = [1 for i in range(lA)]
for t in range(T-1, 0, -1):
beta = [sum([A[i][j] * B[j][O[t]] * beta[j] for j in range(lA)]) for i in range(lA)]
P_O_M = sum([pai[i]*B[i][O[0]]*beta[i] for i in range(lA)])
print(P_O_M)
# 学习问题
# 一直观测序列O=(o1, o2, ..., oT),估计模型M=(A, B, pai)的参数,使得在该模型下观测序列概率P(O|M)最大
# 即 用极大似然估计的方法估计参数
# TODO
# 预测问题,也称为解码(decoding)问题
# 已知模型M=(A, B, pai)和观测序列O=(o1,o2, ..., oT),求给定观测序列条件P(O|M)最大的状态序列I=(i1, i2, ..., iT)。
# 即,给定观测序列,求最优可能的对应的状态序列。
# TODO
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
- 192
- 193
- 194
- 195
- 196
- 197
- 198
- 199
- 200
- 201
- 202
- 203
- 204
- 205
- 206
- 207
- 208
- 209
- 210
- 211
- 212
- 213
- 214
- 215
- 216
- 217
- 218
- 219
- 220
- 221
- 222
- 223
- 224
- 225
- 226
- 227
- 228
- 229
- 230
- 231
- 232
- 233
- 234
- 235
- 236
- 237
- 238
- 239
- 240
- 241
- 242
- 243
- 244
- 245
- 246
- 247
- 248
- 249
- 250
- 251
- 252
- 253
- 254
- 255
- 256
- 257
- 258
- 259
- 260
- 261
- 262
- 263
- 264
- 265
- 266
- 267
- 268
- 269
- 270
- 271
- 272
- 273
- 274
- 275
- 276
- 277
- 278
- 279
- 280
- 281
- 282
- 283
- 284
- 285
- 286
- 287
- 288
- 289
- 290
- 291
- 292
- 293
- 294
- 295
- 296
- 297
- 298
- 299
- 300
- 301
- 302
- 303
- 304
- 305
- 306
- 307
- 308
- 309
- 310
- 311
- 312
- 313
- 314
- 315
- 316
- 317
- 318
- 319
- 320
- 321
- 322
- 323
- 324
- 325
- 326
- 327
- 328
- 329
- 330
- 331
- 332
- 333
- 334
- 335
- 336
- 337
- 338
- 339
- 340
- 341
- 342
- 343
- 344
- 345
- 346
- 347
- 348
- 349