隐马尔科夫模型(HMM)及其Python实现

基础介绍

首先看下模型结构,对模型有一个直观的概念:

描述下这个图:
分成两排,第一排是$y$序列,第二排是$x$序列。每个$x$都只有一个$y$指向它,每个$y$也都有另一个$y$指向它。

OK,直觉上的东西说完了,下面给出定义(参考《统计学习方法》):

  • 状态序列(上图中的$y$,下面的$I$)
    隐藏的马尔科夫链随机生成的状态序列,称为状态序列(state sequence)
  • 观测序列(上图中的$x$,下面的$O$):
    每个状态生成一个观测,而由此产生的观测的随机序列,称为观测序列(obeservation sequence)
  • 马尔科夫模型
    马尔科夫模型是关于时序的概率模型,描述由一个隐藏的马尔科夫链随机生成不可观测的状态随机序列,再由各个状态生成一个观测而产生观测随机序列的过程。

形式定义

设$Q$是所有可能的状态的集合,$V$是所有可能的观测的集合。

$Q={q_1,q_2,…,q_N},V={v_1,v_2,…,v_M}$
其中,$N$是可能的状态数,$M$是可能的观测数。

$I$是长度为$T$的状态序列,$O$是对应的观测序列。
$I=(i_1,i_2,…,i_T),O=(o_1,o_2,…,o_T)$

A是状态转移矩阵:$A=[a_{ij}]_{N×N}$ $i=1,2,…,N; j=1,2,…,N$

其中,在时刻$t$,处于$q_i$ 状态的条件下在时刻$t+1$转移到状态$q_j$ 的概率:
$a_{ij}=P(i_{t+1}=q_j|i_t=q_i)$

B是观测概率矩阵:$B=[b_j(k)]_{N×M}$ $k=1,2,…,M; j=1,2,…,N$

其中,在时刻$t$处于状态$q_j$ 的条件下生成观测$v_k$ 的概率:
$b_j(k)=P(o_t=v_k|i_t=q_j)$

π是初始状态概率向量:$π=(π_i)$
其中,$π_i=P(i_1=q_i)$

隐马尔科夫模型由初始状态概率向量$π$、状态转移概率矩阵A和观测概率矩阵$B$决定。$π$和$A$决定状态序列,$B$决定观测序列。因此,隐马尔科夫模型$λ$可以由三元符号表示,即:$λ=(A,B,π)$。$A,B,π$称为隐马尔科夫模型的三要素

隐马尔科夫模型的两个基本假设

  1. 设隐马尔科夫链在任意时刻$t$的状态只依赖于其前一时刻的状态,与其他时刻的状态及观测无关,也与时刻$t$无关。(齐次马尔科夫性假设
  2. 假设任意时刻的观测只依赖于该时刻的马尔科夫链的状态,与其他观测和状态无关。(观测独立性假设

一个关于感冒的实例

定义讲完了,举个实例,参考hankcs和知乎上的感冒预测的例子(实际上都是来自wikipidia: https://en.wikipedia.org/wiki/Viterbi_algorithm#Example ),这里我用最简单的语言去描述。

假设你是一个医生,眼前有个病人,你的任务是确定他是否得了感冒。

  • 首先,病人的状态($Q$)只有两种:{感冒,没有感冒}。
  • 然后,病人的感觉(观测$V$)有三种:{正常,冷,头晕}。
  • 手头有病人的病例,你可以从病例的第一天确定$π$(初始状态概率向量);
  • 然后根据其他病例信息,确定$A$(状态转移矩阵)也就是病人某天是否感冒和他第二天是否感冒的关系;
  • 还可以确定$B$(观测概率矩阵)也就是病人某天是什么感觉和他那天是否感冒的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import numpy as np

# 对应状态集合Q
states = ('Healthy', 'Fever')
# 对应观测集合V
observations = ('normal', 'cold', 'dizzy')
# 初始状态概率向量π
start_probability = {'Healthy': 0.6, 'Fever': 0.4}
# 状态转移矩阵A
transition_probability = {
'Healthy': {'Healthy': 0.7, 'Fever': 0.3},
'Fever': {'Healthy': 0.4, 'Fever': 0.6},
}
# 观测概率矩阵B
emission_probability = {
'Healthy': {'normal': 0.5, 'cold': 0.4, 'dizzy': 0.1},
'Fever': {'normal': 0.1, 'cold': 0.3, 'dizzy': 0.6},
}

# 随机生成观测序列和状态序列
def simulate(T):

def draw_from(probs):
"""
1.np.random.multinomial:
按照多项式分布,生成数据
>>> np.random.multinomial(20, [1/6.]*6, size=2)
array([[3, 4, 3, 3, 4, 3],
[2, 4, 3, 4, 0, 7]])
For the first run, we threw 3 times 1, 4 times 2, etc.
For the second, we threw 2 times 1, 4 times 2, etc.
2.np.where:
>>> x = np.arange(9.).reshape(3, 3)
>>> np.where( x > 5 )
(array([2, 2, 2]), array([0, 1, 2]))
"""
return np.where(np.random.multinomial(1,probs) == 1)[0][0]

observations = np.zeros(T, dtype=int)
states = np.zeros(T, dtype=int)
states[0] = draw_from(pi)
observations[0] = draw_from(B[states[0],:])
for t in range(1, T):
states[t] = draw_from(A[states[t-1],:])
observations[t] = draw_from(B[states[t],:])
return observations, states

def generate_index_map(lables):
id2label = {}
label2id = {}
i = 0
for l in lables:
id2label[i] = l
label2id[l] = i
i += 1
return id2label, label2id

states_id2label, states_label2id = generate_index_map(states)
observations_id2label, observations_label2id = generate_index_map(observations)
print(states_id2label, states_label2id)
print(observations_id2label, observations_label2id)
1
2
{0: 'Healthy', 1: 'Fever'} {'Healthy': 0, 'Fever': 1}
{0: 'normal', 1: 'cold', 2: 'dizzy'} {'normal': 0, 'cold': 1, 'dizzy': 2}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def convert_map_to_vector(map_, label2id):
"""将概率向量从dict转换成一维array"""
v = np.zeros(len(map_), dtype=float)
for e in map_:
v[label2id[e]] = map_[e]
return v


def convert_map_to_matrix(map_, label2id1, label2id2):
"""将概率转移矩阵从dict转换成矩阵"""
m = np.zeros((len(label2id1), len(label2id2)), dtype=float)
for line in map_:
for col in map_[line]:
m[label2id1[line]][label2id2[col]] = map_[line][col]
return m
1
2
3
4
5
6
7
A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
print(A)
B = convert_map_to_matrix(emission_probability, states_label2id, observations_label2id)
print(B)
observations_index = [observations_label2id[o] for o in observations]
pi = convert_map_to_vector(start_probability, states_label2id)
print(pi)
1
2
3
4
5
[[ 0.7  0.3]
[ 0.4 0.6]]
[[ 0.5 0.4 0.1]
[ 0.1 0.3 0.6]]
[ 0.6 0.4]
1
2
3
4
5
6
7
# 生成模拟数据
observations_data, states_data = simulate(10)
print(observations_data)
print(states_data)
# 相应的label
print("病人的状态: ", [states_id2label[index] for index in states_data])
print("病人的观测: ", [observations_id2label[index] for index in observations_data])
1
2
3
4
[0 0 1 1 2 1 2 2 2 0]
[0 0 0 0 1 1 1 1 1 0]
病人的状态: ['Healthy', 'Healthy', 'Healthy', 'Healthy', 'Fever', 'Fever', 'Fever', 'Fever', 'Fever', 'Healthy']
病人的观测: ['normal', 'normal', 'cold', 'cold', 'dizzy', 'cold', 'dizzy', 'dizzy', 'dizzy', 'normal']

2.HMM的三个问题

HMM在实际应用中,一般会遇上三种问题:

  • 1.概率计算问题:给定模型$λ=(A,B,π)$
    和观测序列$O={o_1,o_2,…,o_T}$,计算在模型$λ$下观测序列$O$出现的概率$P(O|λ)$。
  • 2.学习问题:已知观测序列$O={o_1,o_2,…,o_T}$,估计模型$λ=(A,B,π)$,使$P(O|λ)$最大。即用极大似然法的方法估计参数。
  • 3.预测问题(也称为解码(decoding)问题):已知观测序列$O={o_1,o_2,…,o_T}$ 和模型$λ=(A,B,π)$,求给定观测序列条件概率$P(I|O)$最大的状态序列$I=(i_1,i_2,…,i_T)$,即给定观测序列,求最有可能的对应的状态序列

回到刚才的例子,这三个问题就是:

  • 1.概率计算问题:如果给定模型参数,病人某一系列观测的症状出现的概率。
  • 2.学习问题:根据病人某一些列观测的症状,学习模型参数。
  • 3.预测问题:根据学到的模型,预测病人这几天是不是有感冒。

2.1 概率计算问题

概率计算问题计算的是:在模型$λ$下观测序列$O$出现的概率$P(O|λ)$。

直接计算

对于状态序列$I=(i_1,i_2, …,i_T)$的概率是:$P(I|\lambda)=\pi_{i_1}a_{i_1i_2}a_{i_2i_3}…a_{i_{T-1}i_T}$。

对上面这种状态序列,产生观测序列$O=(o_1, o_2, …,o_T)$的概率是$P(O|I,\lambda)=b_{i_1}(o_1)b_{i_2}(o_2)…b_{i_T}(o_{T})$。

$I$和$O$的联合概率为:
$$P(O,I|\lambda)=P(O|I,\lambda)P(I|\lambda)=\pi_{i_1}b_{i_1}(o_1)a_{i_1i_2}b_{i_2}(o_2)…a_{i_{T-1}i_T}b_{i_T}(o_{T})$$

对所有可能的$I$求和,得到:
$$P(O|λ)=\sum_IP(O,I|\lambda)=\sum_{i_1,…,i_T}\pi_{i_1}b_{i_1}(o_1)a_{i_1i_2}b_{i_2}(o_2)…a_{i_{T-1}i_T}b_{i_T}(o_{T})$$
如果直接计算,时间复杂度太高,是$O(TN^T)$。

前向算法(或者后向算法)

首先引入前向概率

给定模型$λ$,定义到时刻$t$部分观测序列为$o_1,o_2,…,o_t$ 且状态为$q_i$ 的概率为前向概率。记作:

$$α_t(i)=P(o_1,o_2,…,o_t,i_t=q_i|λ)$$

用感冒例子描述就是:某一天是否感冒以及这天和这天之前所有的观测症状的联合概率。

后向概率定义类似。

前向算法

输入:隐马模型$λ$,观测序列$O$;
输出:观测序列概率$P(O|λ)$.

  • 初值$(t=1)$,$α_1(i)=P(o_1,i_1=q_1|λ)=π_ib_i(o_1)$,$i=1,2,…,N $
  • 递推:对$t=1,2,…,N$,$α_{t+1}(i)=[\sum^N_{j=1}α_t(j)a_{ji}]b_i(o_{t+1})$
  • 终结:$P(O|λ)=\sum^N_{i=1}α_T(i)$

前向算法理解:

前向算法使用前向概率的概念,记录每个时间下的前向概率,使得在递推计算下一个前向概率时,只需要上一个时间点的所有前向概率即可。原理上也是用空间换时间。这样的**时间复杂度是$O(N^2T)$**。

前向算法/后向算法python实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def forward(obs_seq):
"""前向算法"""
N = A.shape[0]
T = len(obs_seq)

# F保存前向概率矩阵
F = np.zeros((N,T))
F[:,0] = pi * B[:, obs_seq[0]]

for t in range(1, T):
for n in range(N):
F[n,t] = np.dot(F[:,t-1], (A[:,n])) * B[n, obs_seq[t]]

return F

def backward(obs_seq):
"""后向算法"""
N = A.shape[0]
T = len(obs_seq)
# X保存后向概率矩阵
X = np.zeros((N,T))
X[:,-1:] = 1

for t in reversed(range(T-1)):
for n in range(N):
X[n,t] = np.sum(X[:,t+1] * A[n,:] * B[:, obs_seq[t+1]])

return X

2.2学习问题

学习问题我们这里只关注非监督的学习算法,有监督的学习算法在有标注数据的前提下,使用极大似然估计法可以很方便地估计模型参数。

非监督的情况,也就是我们只有一堆观测数据,对应到感冒预测的例子,即,我们只知道病人之前的几天是什么感受,但是不知道他之前是否被确认为感冒。

在这种情况下,我们可以使用EM算法,将状态变量视作隐变量。使用EM算法学习HMM参数的算法称为Baum-Weich算法

模型表达式:
$$P(O|λ)=\sum_IP(O|I,λ)P(I|λ)$$

Baum-Weich算法

(1). 确定完全数据的对数似然函数
完全数据是$(O,I)=(o_1,o_2,…,o_T,i_1,…,i_T)$

完全数据的对数似然函数是:$logP(O,I|λ)$。

(2). EM算法的E步:

$$Q(λ,\hatλ)=\sum_IlogP(O,I|λ)P(O,I|\hatλ)$$
注意,这里忽略了对于$λ$而言是常数因子的$\frac{1}{P(O|\hatλ)}$

其中,$\hatλ$是隐马尔科夫模型参数的当前估计值,λ是要极大化的因马尔科夫模型参数。

又有:
$$P(O,I|λ)=π_{i_1}b_{i_1}(o_1)a_{i_1,i_2}b_{i_2}(o_2)…a_{i_T-1,i_T}b_{i_T}(o_T)$$

于是$Q(λ,\hatλ)$可以写成:
$$Q(λ,\hatλ)=\sum_Ilogπ_{i_1}P(O,I|\hatλ)+\sum_I(\sum^{T-1}_{t=1}loga_{i_t-1,i_t})P(O,I|\hatλ)+\sum_I(\sum^{T-1}_{t=1}logb_{i_t}(o_t))P(O,I|\hatλ)$$

(3). EM算法的M步:

极大化Q函数$Q(λ,\hatλ)$ 求模型参数$A,B,π$。

应用拉格朗日乘子法对各参数求偏导,解得Baum-Weich模型参数估计公式

$$a_{ij}=\frac{\sum_{t=1}^{T-1}ξ_t(i,j)}{\sum_{t=1}^{T-1}γ_t(i)}$$

$$b_j(k)=\frac{\sum^T_{t=1,o_t=v_k}γ_t(j)}{\sum_{t=1}^{T}γ_t(j)}$$

$$π_i=γ_1(i)$$

其中$γ_t(i)$和$ξ_t(i,j)$是:

$$γ_t(i)=P(i_t=q_i|O,λ)$$

$$=\frac{P(i_t=q_i,O|λ)}{P(O|λ)}$$

$$=\frac{α_t(i)β_t(i)}{\sum_{j=1}^Nα_t(j)β_t(j)}$$

读作gamma,即,给定模型参数和所有观测,时刻$t$处于状态$q_i$的概率

$$ξ_t(i,j)$$

$$=P(i_t=q_i,i_{i+1}=q_j|O,λ)$$

$$=\frac{P(i_t=q_i,i_{i+1}=q_j,O|λ)}{P(O|λ)}$$

$$=\frac{P(i_t=q_i,i_{i+1}=q_j,O|λ)}{\sum_{i=1}^N\sum_{j=1}^NP(i_t=q_i,i_{i+1}=q_j,O|λ)}$$

读作xi,即,给定模型参数和所有观测,时刻$t$处于状态$q_i$且时刻$t+1$处于状态$q_j$的概率

带入$P(i_t=q_i,i_{i+1}=q_j,O|λ)=α_t(i)a_{ij}b_j(o_{t+1})β_{t+1}(j)$

得到:$ξ_t(i,j)=\frac{α_t(i)a_{ij}b_j(o_{t+1})β_{t+1}(j)}{\sum_{i=1}^N\sum_{j=1}^Nα_t(i)a_{ij}b_j(o_{t+1})β_{t+1}(j)}$

Baum-Weich算法的python实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def baum_welch_train(observations, A, B, pi, criterion=0.05):
"""无监督学习算法——Baum-Weich算法"""
n_states = A.shape[0]
n_samples = len(observations)

done = False
while not done:
# alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
# Initialize alpha
alpha = forward(observations)

# beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
# Initialize beta
beta = backward(observations)
# ξ_t(i,j)=P(i_t=q_i,i_{i+1}=q_j|O,λ)
xi = np.zeros((n_states,n_states,n_samples-1))
for t in range(n_samples-1):
denom = np.dot(np.dot(alpha[:,t].T, A) * B[:,observations[t+1]].T, beta[:,t+1])
for i in range(n_states):
numer = alpha[i,t] * A[i,:] * B[:,observations[t+1]].T * beta[:,t+1].T
xi[i,:,t] = numer / denom

# γ_t(i):gamma_t(i) = P(q_t = S_i | O, hmm)
gamma = np.sum(xi,axis=1)
# Need final gamma element for new B
# xi的第三维长度n_samples-1,少一个,所以gamma要计算最后一个
prod = (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
gamma = np.hstack((gamma, prod / np.sum(prod))) #append one more to gamma!!!

# 更新模型参数
newpi = gamma[:,0]
newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
newB = np.copy(B)
num_levels = B.shape[1]
sumgamma = np.sum(gamma,axis=1)
for lev in range(num_levels):
mask = observations == lev
newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

# 检查是否满足阈值
if np.max(abs(pi - newpi)) < criterion and \
np.max(abs(A - newA)) < criterion and \
np.max(abs(B - newB)) < criterion:
done = 1
A[:], B[:], pi[:] = newA, newB, newpi
return newA, newB, newpi

回到预测感冒的问题,下面我们先自己建立一个HMM模型,再模拟出一个观测序列和一个状态序列。

然后,只用观测序列去学习模型,获得模型参数。

1
2
3
4
5
6
7
8
9
A = np.array([[0.5, 0.5],[0.5, 0.5]])
B = np.array([[0.3, 0.3, 0.3],[0.3, 0.3, 0.3]])
pi = np.array([0.5, 0.5])

observations_data, states_data = simulate(100)
newA, newB, newpi = baum_welch_train(observations_data, A, B, pi)
print("newA: ", newA)
print("newB: ", newB)
print("newpi: ", newpi)
1
2
3
4
5
newA:  [[ 0.5  0.5]
[ 0.5 0.5]]
newB: [[ 0.28 0.32 0.4 ]
[ 0.28 0.32 0.4 ]]
newpi: [ 0.5 0.5]

2.3预测问题

考虑到预测问题是求给定观测序列条件概率$P(I|O)$最大的状态序列$I=(i_1,i_2,…,i_T)$,类比这个问题和最短路问题:
我们可以把求$P(I|O)$的最大值类比成求节点间距离的最小值,于是考虑类似于动态规划的viterbi算法

**首先导入两个变量$δ$和$ψ$**:

定义在时刻$t$状态为$i$的所有单个路径$(i_1,i_2,i_3,…,i_t)$中概率最大值为(这里考虑$P(I,O)$便于计算,因为给定的$P(O)$,$P(I|O)$正比于$P(I,O)$):

$$δ_t(i)=max_{i_1,i_2,…,i_t-1}P(i_t=i,i_{t-1},…,i_1,o_t,o_{t-1},…,o_1|λ)$$

读作delta,其中,$i=1,2,…,N$

得到其递推公式:

$$δ_t(i)=max_{1≤j≤N}[δ_{t-1}(j)a_{ji}]b_i(o_1)$$

定义在时刻$t$状态为$i$的所有单个路径$(i_1,i_2,i_3,…,i_{t-1},i)$中概率最大的路径的第$t-1$个结点

$$ψ_t(i)=argmax_{1≤j≤N}[δ_{t-1}(j)a_{ji}]$$

读作psi,其中,$i=1,2,…,N$

下面介绍维特比算法。
维特比(viterbi)算法(动态规划):

输入:模型$λ=(A,B,π)$和观测$O=(o_1,o_2,…,o_T)$
输出:最优路径$I^*=(i^*_1,i^*_2,…,i^*_T)$

(1).初始化:
$$δ_1(i)=π_ib_i(o_1)$$
$$ψ_1(i)=0$$

(2).递推。对$t=2,3,…,T$
$$δ_t(i)=max_{1≤j≤N}[δ_{t-1}(j)a_{ji}]b_i(o_t)$$
$$ψ_t(i)=argmax_{1≤j≤N}[δ_{t-1}(j)a_{ji}]$$

(3).终止:
$$P^*=max_{1≤i≤N}δ_T(i)$$

$$i^*_T=argmax_{1≤i≤N}δ_T(i)$$

(4).最优路径回溯,对$t=T-1,T-2,…,1$
$$i^*_t=ψ_{t+1}(i^*_{t+1})$$

求得最优路径$I^*=(i_1^*,i_2^*,…,i_T^*)$

注:上面的$b_i(o_t)$和$ψ_{t+1}(i^*_{t+1})$的括号,并不是函数,而是类似于数组取下标的操作。

viterbi算法python实现(V对应δ,prev对应ψ):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def viterbi(obs_seq, A, B, pi):
"""
Returns
-------
V : numpy.ndarray
V [s][t] = Maximum probability of an observation sequence ending
at time 't' with final state 's'
prev : numpy.ndarray
Contains a pointer to the previous state at t-1 that maximizes
V[state][t]

V对应δ,prev对应ψ
"""
N = A.shape[0]
T = len(obs_seq)
prev = np.zeros((T - 1, N), dtype=int)

# DP matrix containing max likelihood of state at a given time
V = np.zeros((N, T))
V[:,0] = pi * B[:,obs_seq[0]]

for t in range(1, T):
for n in range(N):
seq_probs = V[:,t-1] * A[:,n] * B[n, obs_seq[t]]
prev[t-1,n] = np.argmax(seq_probs)
V[n,t] = np.max(seq_probs)

return V, prev

def build_viterbi_path(prev, last_state):
"""Returns a state path ending in last_state in reverse order.
最优路径回溯
"""
T = len(prev)
yield(last_state)
for i in range(T-1, -1, -1):
yield(prev[i, last_state])
last_state = prev[i, last_state]

def observation_prob(obs_seq):
""" P( entire observation sequence | A, B, pi ) """
return np.sum(forward(obs_seq)[:,-1])

def state_path(obs_seq, A, B, pi):
"""
Returns
-------
V[last_state, -1] : float
Probability of the optimal state path
path : list(int)
Optimal state path for the observation sequence
"""
V, prev = viterbi(obs_seq, A, B, pi)
# Build state path with greatest probability
last_state = np.argmax(V[:,-1])
path = list(build_viterbi_path(prev, last_state))

return V[last_state,-1], reversed(path)

继续感冒预测的例子,根据刚才学得的模型参数,再去预测状态序列,观测准确率。

1
2
3
4
5
6
7
states_out = state_path(observations_data, newA, newB, newpi)[1]
p = 0.0
for s in states_data:
if next(states_out) == s:
p += 1

print(p / len(states_data))
1
0.54

因为是随机生成的样本,因此准确率较低也可以理解。

使用Viterbi算法计算病人的病情以及相应的概率:

1
2
3
4
5
6
7
8
9
10
11
12
13
A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
B = convert_map_to_matrix(emission_probability, states_label2id, observations_label2id)
observations_index = [observations_label2id[o] for o in observations]
pi = convert_map_to_vector(start_probability, states_label2id)
V, p = viterbi(observations_index, newA, newB, newpi)
print(" " * 7, " ".join(("%10s" % observations_id2label[i]) for i in observations_index))
for s in range(0, 2):
print("%7s: " % states_id2label[s] + " ".join("%10s" % ("%f" % v) for v in V[s]))
print('\nThe most possible states and probability are:')
p, ss = state_path(observations_index, newA, newB, newpi)
for s in ss:
print(states_id2label[s])
print(p)
1
2
3
4
5
6
7
8
9
            normal       cold      dizzy
Healthy: 0.140000 0.022400 0.004480
Fever: 0.140000 0.022400 0.004480

The most possible states and probability are:
Healthy
Healthy
Healthy
0.00448

3.完整代码

代码主要参考Hankcs的博客,hankcs参考的是colostate大学的教学代码

完整的隐马尔科夫用类包装的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class HMM:
"""
Order 1 Hidden Markov Model

Attributes
----------
A : numpy.ndarray
State transition probability matrix
B: numpy.ndarray
Output emission probability matrix with shape(N, number of output types)
pi: numpy.ndarray
Initial state probablity vector
"""
def __init__(self, A, B, pi):
self.A = A
self.B = B
self.pi = pi

def simulate(self, T):

def draw_from(probs):
"""
1.np.random.multinomial:
按照多项式分布,生成数据
>>> np.random.multinomial(20, [1/6.]*6, size=2)
array([[3, 4, 3, 3, 4, 3],
[2, 4, 3, 4, 0, 7]])
For the first run, we threw 3 times 1, 4 times 2, etc.
For the second, we threw 2 times 1, 4 times 2, etc.
2.np.where:
>>> x = np.arange(9.).reshape(3, 3)
>>> np.where( x > 5 )
(array([2, 2, 2]), array([0, 1, 2]))
"""
return np.where(np.random.multinomial(1,probs) == 1)[0][0]

observations = np.zeros(T, dtype=int)
states = np.zeros(T, dtype=int)
states[0] = draw_from(self.pi)
observations[0] = draw_from(self.B[states[0],:])
for t in range(1, T):
states[t] = draw_from(self.A[states[t-1],:])
observations[t] = draw_from(self.B[states[t],:])
return observations,states

def _forward(self, obs_seq):
"""前向算法"""
N = self.A.shape[0]
T = len(obs_seq)

F = np.zeros((N,T))
F[:,0] = self.pi * self.B[:, obs_seq[0]]

for t in range(1, T):
for n in range(N):
F[n,t] = np.dot(F[:,t-1], (self.A[:,n])) * self.B[n, obs_seq[t]]

return F

def _backward(self, obs_seq):
"""后向算法"""
N = self.A.shape[0]
T = len(obs_seq)

X = np.zeros((N,T))
X[:,-1:] = 1

for t in reversed(range(T-1)):
for n in range(N):
X[n,t] = np.sum(X[:,t+1] * self.A[n,:] * self.B[:, obs_seq[t+1]])

return X

def baum_welch_train(self, observations, criterion=0.05):
"""无监督学习算法——Baum-Weich算法"""
n_states = self.A.shape[0]
n_samples = len(observations)

done = False
while not done:
# alpha_t(i) = P(O_1 O_2 ... O_t, q_t = S_i | hmm)
# Initialize alpha
alpha = self._forward(observations)

# beta_t(i) = P(O_t+1 O_t+2 ... O_T | q_t = S_i , hmm)
# Initialize beta
beta = self._backward(observations)

xi = np.zeros((n_states,n_states,n_samples-1))
for t in range(n_samples-1):
denom = np.dot(np.dot(alpha[:,t].T, self.A) * self.B[:,observations[t+1]].T, beta[:,t+1])
for i in range(n_states):
numer = alpha[i,t] * self.A[i,:] * self.B[:,observations[t+1]].T * beta[:,t+1].T
xi[i,:,t] = numer / denom

# gamma_t(i) = P(q_t = S_i | O, hmm)
gamma = np.sum(xi,axis=1)
# Need final gamma element for new B
prod = (alpha[:,n_samples-1] * beta[:,n_samples-1]).reshape((-1,1))
gamma = np.hstack((gamma, prod / np.sum(prod))) #append one more to gamma!!!

newpi = gamma[:,0]
newA = np.sum(xi,2) / np.sum(gamma[:,:-1],axis=1).reshape((-1,1))
newB = np.copy(self.B)

num_levels = self.B.shape[1]
sumgamma = np.sum(gamma,axis=1)
for lev in range(num_levels):
mask = observations == lev
newB[:,lev] = np.sum(gamma[:,mask],axis=1) / sumgamma

if np.max(abs(self.pi - newpi)) < criterion and \
np.max(abs(self.A - newA)) < criterion and \
np.max(abs(self.B - newB)) < criterion:
done = 1

self.A[:],self.B[:],self.pi[:] = newA,newB,newpi

def observation_prob(self, obs_seq):
""" P( entire observation sequence | A, B, pi ) """
return np.sum(self._forward(obs_seq)[:,-1])

def state_path(self, obs_seq):
"""
Returns
-------
V[last_state, -1] : float
Probability of the optimal state path
path : list(int)
Optimal state path for the observation sequence
"""
V, prev = self.viterbi(obs_seq)

# Build state path with greatest probability
last_state = np.argmax(V[:,-1])
path = list(self.build_viterbi_path(prev, last_state))

return V[last_state,-1], reversed(path)

def viterbi(self, obs_seq):
"""
Returns
-------
V : numpy.ndarray
V [s][t] = Maximum probability of an observation sequence ending
at time 't' with final state 's'
prev : numpy.ndarray
Contains a pointer to the previous state at t-1 that maximizes
V[state][t]
"""
N = self.A.shape[0]
T = len(obs_seq)
prev = np.zeros((T - 1, N), dtype=int)

# DP matrix containing max likelihood of state at a given time
V = np.zeros((N, T))
V[:,0] = self.pi * self.B[:,obs_seq[0]]

for t in range(1, T):
for n in range(N):
seq_probs = V[:,t-1] * self.A[:,n] * self.B[n, obs_seq[t]]
prev[t-1,n] = np.argmax(seq_probs)
V[n,t] = np.max(seq_probs)

return V, prev

def build_viterbi_path(self, prev, last_state):
"""Returns a state path ending in last_state in reverse order."""
T = len(prev)
yield(last_state)
for i in range(T-1, -1, -1):
yield(prev[i, last_state])
last_state = prev[i, last_state]