2022年 11月 5日

python 实现HMM

python 实现 HMM算法

个人学习记录备份,内容只对本人有效,不保证正确性

# -*-coding:UTF-8-*-

import numpy as np

class HMM:
    def __init__(self, Ann, Bnm, pi1n):
        # 状态转移概率矩阵
        self.A = np.array(Ann)
        # 观测概率矩阵
        self.B = np.array(Bnm)
        # 初始概率分布
        self.pi = np.array(pi1n)
        # 状态的数量
        self.N = self.A.shape[0]
        # 观测结果的数量
        self.M = self.B.shape[1]

    def printhmm(self):
        print("==================================================")
        print("HMM content: N =", self.N, ",M =", self.M)
        for i in range(self.N):
            if i == 0:
                print("hmm.A ", self.A[i, :], " hmm.B ", self.B[i, :])
            else:
                print("      ", self.A[i, :], "       ", self.B[i, :])
        print("hmm.pi", self.pi)
        print("==================================================")

    # 函数名称:Forward *功能:前向算法估计参数 *参数:phmm:指向HMM的指针
    # T:观察值序列的长度 O:观察值序列
    # alpha:运算中用到的临时数组 pprob:返回值,所要求的概率
    def Forward(self, T, O, alpha, pprob):
        #     1. Initialization 初始化
        for i in range(self.N):
            alpha[0, i] = self.pi[i] * self.B[i, O[0]]

        #     2. Induction 递归
        for t in range(T - 1):
            for j in range(self.N):
                sum = 0.0
                for i in range(self.N):
                    sum += alpha[t, i] * self.A[i, j]
                alpha[t + 1, j] = sum * self.B[j, O[t + 1]]
        #     3. Termination 终止
        sum = 0.0
        for i in range(self.N):
            sum += alpha[T - 1, i]
        pprob[0] *= sum

    # 带修正的前向算法
    def ForwardWithScale(self, T, O, alpha, scale, pprob):
        scale[0] = 0.0
        #     1. Initialization
        for i in range(self.N):
            alpha[0, i] = self.pi[i] * self.B[i, O[0]]
            scale[0] += alpha[0, i]

        for i in range(self.N):
            alpha[0, i] /= scale[0]

        #     2. Induction
        for t in range(T - 1):
            scale[t + 1] = 0.0
            for j in range(self.N):
                sum = 0.0
                for i in range(self.N):
                    sum += alpha[t, i] * self.A[i, j]

                alpha[t + 1, j] = sum * self.B[j, O[t + 1]]
                scale[t + 1] += alpha[t + 1, j]
            for j in range(self.N):
                alpha[t + 1, j] /= scale[t + 1]

        #     3. Termination
        for t in range(T):
            pprob[0] += np.log(scale[t])

    # 函数名称:Backward * 功能:后向算法估计参数 * 参数:phmm:指向HMM的指针
    # T:观察值序列的长度 O:观察值序列
    # beta:运算中用到的临时数组 pprob:返回值,所要求的概率
    def Backword(self, T, O, beta, pprob):
        #     1. Intialization
        for i in range(self.N):
            beta[T - 1, i] = 1.0
        #     2. Induction
        for t in range(T - 2, -1, -1):
            for i in range(self.N):
                sum = 0.0
                for j in range(self.N):
                    sum += self.A[i, j] * self.B[j, O[t + 1]] * beta[t + 1, j]
                beta[t, i] = sum

        #     3. Termination
        pprob[0] = 0.0
        for i in range(self.N):
            pprob[0] += self.pi[i] * self.B[i, O[0]] * beta[0, i]

    # 带修正的后向算法
    def BackwardWithScale(self, T, O, beta, scale):
        #     1. Intialization
        for i in range(self.N):
            beta[T - 1, i] = 1.0

        #     2. Induction
        for t in range(T - 2, -1, -1):
            for i in range(self.N):
                sum = 0.0
                for j in range(self.N):
                    sum += self.A[i, j] * self.B[j, O[t + 1]] * beta[t + 1, j]
                beta[t, i] = sum / scale[t + 1]

    # Viterbi算法
    # 输入:A,B,pi,O 输出P(O|lambda)最大时Poptimal的路径I
    def viterbi(self, O):
        T = len(O)
        # 初始化
        delta = np.zeros((T, self.N), np.float)
        phi = np.zeros((T, self.N), np.float)
        I = np.zeros(T)
        for i in range(self.N):
            delta[0, i] = self.pi[i] * self.B[i, O[0]]
            phi[0, i] = 0
        # 递推
        for t in range(1, T):
            for i in range(self.N):
                delta[t, i] = (
                    self.B[i, O[t]]
                    * np.array(
                        [delta[t - 1, j] * self.A[j, i] for j in range(self.N)]
                    ).max()
                )
                phi[t, i] = np.array(
                    [delta[t - 1, j] * self.A[j, i] for j in range(self.N)]
                ).argmax()
        # 终结
        prob = delta[T - 1, :].max()
        I[T - 1] = delta[T - 1, :].argmax()
        # 状态序列求取
        for t in range(T - 2, -1, -1):
            I[t] = phi[t + 1, I[t + 1]]
        return I, prob

    # 计算gamma : 时刻t时马尔可夫链处于状态Si的概率
    def ComputeGamma(self, T, alpha, beta, gamma):
        for t in range(T):
            denominator = 0.0
            for j in range(self.N):
                gamma[t, j] = alpha[t, j] * beta[t, j]
                denominator += gamma[t, j]
            for i in range(self.N):
                gamma[t, i] = gamma[t, i] / denominator

    # 计算sai(i,j) 为给定训练序列O和模型lambda时:
    # 时刻t是马尔可夫链处于Si状态,二时刻t+1处于Sj状态的概率
    def ComputeXi(self, T, O, alpha, beta, gamma, xi):
        for t in range(T - 1):
            sum = 0.0
            for i in range(self.N):
                for j in range(self.N):
                    xi[t, i, j] = (
                        alpha[t, i]
                        * beta[t + 1, j]
                        * self.A[i, j]
                        * self.B[j, O[t + 1]]
                    )
                    sum += xi[t, i, j]
            for i in range(self.N):
                for j in range(self.N):
                    xi[t, i, j] /= sum

    # Baum-Welch算法
    # 初始模型:HMM={A,B,pi,N,M}
    # 输入 L个观测序列O
    def BaumWelch(self, L, T, O, alpha, beta, gamma):
        print("BaumWelch")
        DELTA = 0.01
        round = 0
        flag = 1
        probf = [0.0]
        delta = 0.0
        deltaprev = 0.0
        probprev = 0.0
        ratio = 0.0
        deltaprev = 10e-70

        xi = np.zeros((T, self.N, self.N))
        pi = np.zeros((T), np.float)
        denominatorA = np.zeros((self.N), np.float)
        denominatorB = np.zeros((self.N), np.float)
        numeratorA = np.zeros((self.N, self.N), np.float)
        numeratorB = np.zeros((self.N, self.M), np.float)
        scale = np.zeros((T), np.float)

        while True:
            probf[0] = 0
            # E - step
            for l in range(L):
                self.ForwardWithScale(T, O[l], alpha, scale, probf)
                self.BackwardWithScale(T, O[l], beta, scale)
                self.ComputeGamma(T, alpha, beta, gamma)
                self.ComputeXi(T, O[l], alpha, beta, gamma, xi)
                for i in range(self.N):
                    pi[i] += gamma[0, i]
                    for t in range(T - 1):
                        denominatorA[i] += gamma[t, i]
                        denominatorB[i] += gamma[t, i]
                    denominatorB[i] += gamma[T - 1, i]

                    for j in range(self.N):
                        for t in range(T - 1):
                            numeratorA[i, j] += xi[t, i, j]
                    for k in range(self.M):
                        for t in range(T):
                            if O[l][t] == k:
                                numeratorB[i, k] += gamma[t, i]

            # M - step
            # 重估状态转移矩阵 和 观察概率矩阵
            for i in range(self.N):
                self.pi[i] = 0.001 / self.N + 0.999 * pi[i] / L
                for j in range(self.N):
                    self.A[i, j] = (
                        0.001 / self.N + 0.999 * numeratorA[i, j] / denominatorA[i]
                    )
                    numeratorA[i, j] = 0.0

                for k in range(self.M):
                    self.B[i, k] = (
                        0.001 / self.M + 0.999 * numeratorB[i, k] / denominatorB[i]
                    )
                    numeratorB[i, k] = 0.0

                pi[i] = denominatorA[i] = denominatorB[i] = 0.0

            if flag == 1:
                flag = 0
                probprev = probf[0]
                ratio = 1
                continue

            delta = probf[0] - probprev
            ratio = delta / deltaprev
            probprev = probf[0]
            deltaprev = delta
            round += 1

            if ratio <= DELTA:
                print("num iteration ", round)
                break


if __name__ == "__main__":
    print("python my HMM")
    # 例子: 两枚硬币抛正反面,状态
    # 状态集合Q: [q0, q1], q0: 0号硬币, q1: 1号硬币
    # 状态转移概率矩阵A: [[q0 -> q0, q0 -> q1], [q1 -> q0, q1 -> q1]]
    A = [[0.75, 0.25], [0.25, 0.75]]

    # 观测集合V: [v0, v1], v0: 正面, v1: 反面
    # 观测概率矩阵B: [[v0|q0, v1|q0], [v0|q1, v1|q1]]
    B = [[0.51, 0.49], [0.48, 0.52]]

    # 初始概率分布pai: 表示时刻t=1处于状态qi的概率。
    pai = [0.5, 0.5]

    # 初始化 hmm
    hmm = HMM(A, B, pai)

    # 状态序列I 未知
    # 观测序列O 已知
    O = [
        [1, 0, 0, 1, 1, 0, 0, 0, 0],
        [1, 1, 0, 1, 0, 0, 1, 1, 0],
        [0, 0, 1, 1, 0, 0, 1, 1, 1],
    ]
    # 观测次数
    L = len(O)
    # 观测序列的长度
    T = len(O[0])
    # 状态的个数
    lA = len(A)
    # 观测结果的个数
    alpha = np.zeros((T, lA), np.float)
    beta = np.zeros((T, lA), np.float)
    gamma = np.zeros((T, lA), np.float)
    hmm.BaumWelch(L, T, O, alpha, beta, gamma)
    hmm.printhmm()

    # 概率计算问题
    # 给定模型M=(A, B, pai)和观测序列O=(o1, o2, ..., oT),计算模型M下观测序列O出现的概率P(O|M)
    # TODO

    # 状态转移概率矩阵A
    A = [
        [0.5, 0.2, 0.3],
        [0.3, 0.5, 0.2],
        [0.2, 0.3, 0.5],
    ]

    # 观测概率矩阵B
    B = [
        [0.5, 0.5],
        [0.4, 0.6],
        [0.7, 0.3],
    ]

    # 状态集合Q
    Q = [0, 1, 2]
    # 观测集合V
    V = [0, 1]

    # 初始概率分布pai: 只描述状态,表示时刻t=1处于状态qi的概率
    pai = [0.2, 0.4, 0.4]
    # 状态的个数
    lA = len(A)

    # 观测序列长度
    T = 3
    # 观测序列
    O = [0, 1, 0]

    # 前向算法计算
    # 给定隐马尔可夫模型λ,定义到时刻t部分观测序列为o1, o2, ..., ot,且状态为qi的概率为前向概率,记做alpha(t, i)
    # 首次可能观测到 o1 的概率分布
    alpha = [pai[i] * B[i][O[0]] for i in range(lA)]
    # 后续可能观测到 0t 的概率分布
    for t in range(1, T):
        alpha = [sum([alpha[j]*A[j][i] for j in range(lA)])*B[i][O[t]] for i in range(lA)]
    P_O_M = sum([alpha[i] for i in range(lA)])
    print(P_O_M)

    # 后向算法计算
    # 给定隐马尔可夫模型λ,定义在时刻t状态为qi的条件下,从t+1到T的部分观测序列为ot+1, ot+2, ..., oT的概率为后向概率,记做beta(t, i)
    beta = [1 for i in range(lA)]
    for t in range(T-1, 0, -1):
        beta = [sum([A[i][j] * B[j][O[t]] * beta[j] for j in range(lA)]) for i in range(lA)]
    P_O_M = sum([pai[i]*B[i][O[0]]*beta[i] for i in range(lA)])
    print(P_O_M)

    # 学习问题
    # 一直观测序列O=(o1, o2, ..., oT),估计模型M=(A, B, pai)的参数,使得在该模型下观测序列概率P(O|M)最大
    # 即 用极大似然估计的方法估计参数
    # TODO

    # 预测问题,也称为解码(decoding)问题
    # 已知模型M=(A, B, pai)和观测序列O=(o1,o2, ..., oT),求给定观测序列条件P(O|M)最大的状态序列I=(i1, i2, ..., iT)。
    # 即,给定观测序列,求最优可能的对应的状态序列。
    # TODO

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349