不像英文那样单词之间有空格作为天然的分界线, 中文词语之间没有明显界限。必须采用一些方法将中文语句划分为单词序列才能进一步处理, 这一划分步骤即是所谓的中文分词。
主流中文分词方法包括基于规则的分词,基于大规模语料库的统计学习方法以及在实践中应用最多的规则与统计综合方法。
隐马尔科夫模型(HMM)是中文分词中一类常用的统计模型, 本文将使用该模型构造分词器。关于HMM模型的介绍可以参见隐式马尔科夫模型.
方法介绍
中文分词问题可以表示为一个序列标注问题,定义两个类别:
E代表词语中最后一个字
B代表词的首个字
M代表词中间的字
S代表单字成词
对于分词结果:"我/只是/做了/一些/微小/的/工作",可以标注为"我E只B是E做B了E一B些E微B小E的S工B作E".
将标记序列"EBEBEBEBESBE"作为状态序列, 原始文本"我只是做了一些微小的工作"为观测序列. 分词过程即变成了求使给定观测序列出现概率最大的状态序列, 即解码问题。
这里需要说明一下,所谓出现概率最大是指在自然语言中出现概率最大。
根据语料库是否标注, HMM可以进行监督学习和无监督学习。若语料库未标注则需要使用EM算法进行无监督学习, 若语料库已经标注那么可以使用频率估计概率即监督学习方法。
程序实现
本文实现已在python3.5, OS X及Ubuntu操作系统上测试通过.
定义数据结构
上文中已定义用于标注序列的标签, 也即HMM中的状态:
STATES = {'B', 'M', 'E', 'S'}
除了用于分词外, HMM模型还可以用于处理很多问题. 因此, 我们将HMM模型封装为独立的类便于复用:
class HMModel:def __init__(self):self.trans_mat = {} # trans_mat[status][status] = intself.emit_mat = {} # emit_mat[status][observe] = intself.init_vec = {} # init_vec[status] = intself.state_count = {} # state_count[status] = intself.states = {}self.inited = False
其中:
trans_mat
: 状态转移矩阵,trans_mat[state1][state2]
表示训练集中由state1转移到state2的次数。emit_mat
: 观测矩阵,emit_mat[state][char]
表示训练集中单字char被标注为state的次数init_vec
: 初始状态分布向量,init_vec[state]
表示状态state在训练集中出现的次数state_count
: 状态统计向量,state_count[state]
word_set
: 词集合, 包含所有单词
初始化上述数据结构:
def setup(self):for state in self.states:# build trans_matself.trans_mat[state] = {}for target in self.states:self.trans_mat[state][target] = 0.0# build emit_matself.emit_mat[state] = {}# build init_vecself.init_vec[state] = 0# build state_countself.state_count[state] = 0self.inited = True
加载数据:
def load_data(self, filename):self.data = file(data_path(filename))self.setup()
训练数据集用空格分割单词:
如果 出现 这种 情况 ,
则 增加 的 居民 储蓄 存款 的 全部 或 一 部 只能 抵 作 积压 产品 占用 的 资金
而 不能 补充 流动资金 正常 需要量 的 增加
训练模型
因为使用标注数据集, 可以使用更简单的监督学习算法:
def do_train(self, observes, states):if not self.inited:self.setup()for i in range(len(states)):if i == 0:self.init_vec[states[0]] += 1self.state_count[states[0]] += 1else:self.trans_mat[states[i - 1]][states[i]] += 1self.state_count[states[i]] += 1if observes[i] not in self.emit_mat[states[i]]:self.emit_mat[states[i]][observes[i]] = 1else:self.emit_mat[states[i]][observes[i]] += 1
训练函数输入观测序列和状态序列进行训练, 依次更新各矩阵数据.
类中维护的模型参数均为频数而非频率, 这样的设计使得模型可以进行在线训练。
所谓的在线训练是指: 模型随时都可以接受新的训练数据继续训练,不会丢失前次训练的结果。
预测算法
在进行预测前需要定义get_prob
方法将频数转换为频率:
def get_prob(self):init_vec = {}trans_mat = {}emit_mat = {}# convert init_vec to probfor key in self.init_vec:init_vec[key] = float(self.init_vec[key]) / self.state_count[key]# convert trans_mat to probfor key1 in self.trans_mat:trans_mat[key1] = {}for key2 in self.trans_mat[key1]:trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / self.state_count[key1]# convert emit_mat to probfor key1 in self.emit_mat:emit_mat[key1] = {}for key2 in self.emit_mat[key1]:emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / self.state_count[key1]return init_vec, trans_mat, emit_mat
预测采用Viterbi算法求得最优路径:
def do_predict(self, sequence):tab = [{}]path = {}init_vec, trans_mat, emit_mat = self.get_prob()# initfor state in self.states:tab[0][state] = init_vec[state] * emit_mat[state].get(sequence[0], EPS)path[state] = [state]# build dynamic search tablefor t in range(1, len(sequence)):tab.append({})new_path = {}for state1 in self.states:items = []for state2 in self.states:if tab[t - 1][state2] == 0:continueprob = tab[t - 1][state2] * trans_mat[state2].get(state1, EPS) * emit_mat[state1].get(sequence[t], EPS)items.append((prob, state2))best = max(items) # best: (prob, state)tab[t][state1] = best[0]new_path[state1] = path[best[1]] + [state1]path = new_path# search best pathprob, state = max([(tab[len(sequence) - 1][state], state) for state in self.states])return path[state]
tab是动态规划表, tab[t][state]
表示 时刻t 到达state状态 的所有路径中,概率最大路径的 概率值。
初值tab[0][state]
为emit_mat[state][char0] * init_vec[state]
, 其中char0代表输入序列的第一个单字, 若emit_mat中不存在char0的记录则默认为0.
若输入序列的长度为T, 则执行T-1次迭代, 每次迭代在tab中添加一行. 计算t-1时刻各状态转移到state状态的概率, 选择其中最大者作为tab[t][state]
的值, 并将选中的状态转移写入path
中。
在tab的最后一行中寻找概率最大的状态作为终态,从path
中取出状态转移路径作为标注序列。
保存载入
提供json
和pickle
两种格式的存储. 实现保存方法:
def save(self, filename="hmm.json", code='json'):fw = open(filename, 'w', encoding='utf-8')data = {"trans_mat": self.trans_mat,"emit_mat": self.emit_mat,"init_vec": self.init_vec,"state_count": self.state_count}if code == "json":txt = json.dumps(data)txt = txt.encode('utf-8').decode('unicode-escape')fw.write(txt)elif code == "pickle":pickle.dump(data, fw)
load
方法也很简单:
def load(self, filename="hmm.json", code="json"):fr = open(filename, 'r', encoding='utf-8')if code == "json":txt = fr.read()model = json.loads(txt)elif code == "pickle":model = pickle.load(fr)self.trans_mat = model["trans_mat"]self.emit_mat = model["emit_mat"]self.init_vec = model["init_vec"]self.state_count = model["state_count"]self.inited = True
实现分词器
定义一个工具函数, 对每个字进行标注:
STATES = {'B', 'M', 'E', 'S'}def get_tags(src):tags = []if len(src) == 1:tags = ['S']elif len(src) == 2:tags = ['B', 'E']else:m_num = len(src) - 2tags.append('B')tags.extend(['M'] * m_num)tags.append('S')return tags
最后定义一个工具函数, 根据标注序列将输入的句子分割为词语列表:
def cut_sent(src, tags):word_list = []start = -1started = Falseif len(tags) != len(src):return Noneif tags[-1] not in {'S', 'E'}:if tags[-2] in {'S', 'E'}:tags[-1] = 'S' # for tags: r".*(S|E)(B|M)"else:tags[-1] = 'E' # for tags: r".*(B|M)(B|M)"for i in range(len(tags)):if tags[i] == 'S':if started:started = Falseword_list.append(src[start:i]) # for tags: r"BM*S"word_list.append(src[i])elif tags[i] == 'B':if started:word_list.append(src[start:i]) # for tags: r"BM*B"start = istarted = Trueelif tags[i] == 'E':started = Falseword = src[start:i+1]word_list.append(word)elif tags[i] == 'M':continuereturn word_list
需要说明的是, 因为概率模型无法保证每个B标记后都有对应的E标记, 因此认为B标记后的S和B标记都会像E标记一样结束一个单词。
继承HMModel类, 实现HMMSegger分词器:
class HMMSegger(HMModel):def __init__(self, *args, **kwargs):super(HMMSegger, self).__init__(*args, **kwargs)self.states = STATESself.data = Nonedef load_data(self, filename):self.data = open(filename, 'r', encoding="utf-8")def train(self):if not self.inited:self.setup()# trainfor line in self.data:# pre processingline = line.strip()if not line:continue# get observesobserves = []for i in range(len(line)):if line[i] == " ":continueobserves.append(line[i])# get stateswords = line.split(" ") # spilt word by whitespacestates = []for word in words:if word in seg_stop_words:continuestates.extend(get_tags(word))# resume trainself.do_train(observes, states)
train()
方法根据单词生成观测序列和状态序列, 并送往do_train()
方法进行训练.
编写cut()
方法作为操作接口:
def cut(self, sentence):try:tags = self.do_predict(sentence)return cut_sent(sentence, tags)except:return sentence
最后写几个测试用例看下效果:
>>> segger = HMMSegger()
>>> segger.load_data("data.txt")
>>> segger.train()
>>> segger.cut("长春市长春节讲话")
["长春","市长","春节","讲话"]
>>> segger.cut("生存还是死亡是一个问题")
['生存', '还', '是', '死亡', '是', '一个', '问题']
>>> segger.cut("我只是做了一些微小的工作")
["我","只是","做","了","一些","微小","的","工作"]
分词的结果基本令人满意.
HMModel源码
HMMSegger源码
因依赖关系,若要运行代码请clone整个项目, 并运行python3 test.py
。
因训练数据集过于庞大故未上传,可自行制备训练数据集。 项目中包含训练过的模型data/hmm.json
可以自由体验。