【class14】人工智能初步之语音识别
2024-12-26 08:46
以下是从提供的《实验指导.docx》文档中提炼出来的关于人工智能语音识别的Python代码概要:
### 1. 解压数据集
```python
!unzip -q data/data300576/recordings.zip -d wc_work
```
### 2. 切分数据集
```python
import os
import random
# 获取所有音频文件路径
recordings = ['recordings/' + name for name in os.listdir('work/recordings')]
total = []
# 遍历每个音频文件路径,提取标签
for recording in recordings:
label = int(recording[11])
total.append(f'{recording} {label}')
# 创建训练集、验证集和测试集文件
train = open('work/train.tsv', 'w', encoding='UTF-8')
dev = open('work/dev.tsv', 'w', encoding='UTF-8')
test = open('work/test.tsv', 'w', encoding='UTF-8')
# 打乱数据顺序
random.shuffle(total)
# 确定数据集划分的索引
split_num = int((len(total) - 100) * 0.9)
# 写入训练集数据
for line in total[:split_num]:
train.write(line)
# 写入验证集数据
for line in total[split_num:-100]:
dev.write(line)
# 写入测试集数据
for line in total[-100:]:
test.write(line)
# 关闭文件
train.close()
dev.close()
test.close()
```
### 3. 音频数据预处理
```python
import random
import numpy as np
import scipy.io.wavfile as wav
from python_speech_features import mfcc, delta
def get_mfcc(data, fs):
# 提取MFCC特征
wav_feature = mfcc(data, fs)
# 计算一阶差分
d_mfcc_feat = delta(wav_feature, 1)
# 计算二阶差分
d_mfcc_feat2 = delta(wav_feature, 2)
# 拼接特征
feature = np.concatenate([
wav_feature.reshape(1, -1, 13),
d_mfcc_feat.reshape(1, -1, 13),
d_mfcc_feat2.reshape(1, -1, 13)
], axis=0)
# 统一时间维度
if feature.shape[1] > 64:
feature = feature[:, :64, :]
else:
feature = np.pad(feature, ((0, 0), (0, 64 - feature.shape[1]), (0, 0)), 'constant')
# 调整数据维度
feature = feature.transpose((2, 0, 1))
feature = feature[np.newaxis, :]
return feature
def loader(tsv):
datas = []
with open(tsv, 'r', encoding='UTF-8') as f:
for line in f:
audio, label = line.strip().split(' ')
fs, signal = wav.read('work/' + audio)
feature = get_mfcc(signal, fs)
datas.append([feature, int(label)])
return datas
def reader(datas, batch_size, is_random=True):
features = []
labels = []
if is_random:
random.shuffle(datas)
for data in datas:
feature, label = data
features.append(feature)
labels.append(label)
if len(labels) == batch_size:
features = np.concatenate(features, axis=0).reshape(-1, 13, 3, 64).astype('float32')
labels = np.array(labels).reshape(-1, 1).astype('int64')
yield features, labels
features = []
labels = []
```
### 4. 模型搭建
```python
import paddle.fluid as fluid
from paddle.fluid.dygraph import Linear, Conv2D, BatchNorm
from paddle.fluid.layers import softmax_with_cross_entropy, accuracy, reshape
class Audio(fluid.dygraph.Layer):
def __init__(self):
super(Audio, self).__init__()
self.conv1 = Conv2D(13, 16, 3, 1, 1)
self.conv2 = Conv2D(16, 16, (3, 2), (1, 2), (1, 0))
self.conv3 = Conv2D(16, 32, 3, 1, 1)
self.conv4 = Conv2D(32, 32, (3, 2), (1, 2), (1, 0))
self.conv5 = Conv2D(32, 64, 3, 1, 1)
self.conv6 = Conv2D(64, 64, (3, 2), 2)
self.fc1 = Linear(8 * 64, 128)
self.fc2 = Linear(128, 10)
def forward(self, inputs, labels=None):
out = self.conv1(inputs)
out = self.conv2(out)
out = self.conv3(out)
out = self.conv4(out)
out = self.conv5(out)
out = self.conv6(out)
out = reshape(out, [-1, 8 * 64])
out = self.fc1(out)
out = self.fc2(out)
if labels is not None:
loss = softmax_with_cross_entropy(out, labels)
acc = accuracy(out, labels)
return loss, acc
else:
return out
```
### 5. 查看网络结构
```python
import paddle
audio_network = Audio()
paddle.summary(audio_network, input_size=[(64, 13, 3, 64)], dtypes=['float32'])
```
### 6. 模型训练
```python
import numpy as np
import paddle.fluid as fluid
from visualdl import LogWriter
from paddle.fluid.optimizer import Adam
from paddle.fluid.dygraph import to_variable, save_dygraph
writer = LogWriter(logdir="https://blog.csdn.net/fmc121104/article/details/log/train")
train_datas = loader('work/train.tsv')
dev_datas = loader('work/dev.tsv')
place = fluid.CPUPlace()
epochs = 10
with fluid.dygraph.guard(place):
model = Audio()
optimizer = Adam(learning_rate=0.001, parameter_list=model.parameters())
global_step = 0
max_acc = 0
for epoch in range(epochs):
model.train()
train_reader = reader(train_datas, batch_size=64)
for step, data in enumerate(train_reader):
signal, label = [to_variable(_) for _ in data]
loss, acc = model(signal, label)
if step % 20 == 0:
print(f'train epoch: {epoch} step: {step}, loss: {loss.numpy().mean()}, acc: {acc.numpy()}')
writer.add_scalar(tag='train_loss', step=global_step, value=loss.numpy().mean())
writer.add_scalar(tag='train_acc', step=global_step, value=acc.numpy())
global_step += 1
loss.backward()
optimizer.minimize(loss)
model.clear_gradients()
model.eval()
dev_reader = reader(dev_datas, batch_size=64, is_random=False)
accs = []
losses = []
for data in dev_reader:
signal, label = [to_variable(_) for _ in data]
loss, acc = model(signal, label)
losses.append(loss.numpy().mean())
accs.append(acc.numpy())
avg_acc = np.array(accs).mean()
avg_loss = np.array(losses).mean()
if avg_acc > max_acc:
max_acc = avg_acc
print(f'the best accuracy: {max_acc}')
print('saving the best model')
save_dygraph(optimizer.state_dict(), 'best_model')
save_dygraph(model.state_dict(), 'best_model')
print(f'dev epoch: {epoch}, loss: {avg_loss}, acc: {avg_acc}')
writer.add_scalar(tag='dev_loss', step=epoch, value=avg_loss)
writer.add_scalar(tag='dev_acc', step=epoch, value=avg_acc)
print(f'the best accuracy: {max_acc}')
print('saving the final model')
save_dygraph(optimizer.state_dict(), 'final_model')
save_dygraph(model.state_dict(), 'final_model')
```
### 7. 模型测试
```python
import os
import numpy as np
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable, load_dygraph
test_datas = loader('work/test.tsv')
print(f'{len(test_datas)} data in test set')
with fluid.dygraph.guard(fluid.CPUPlace()):
model = Audio()
model.eval()
params_dict, _ = load_dygraph('best_model')
model.set_dict(params_dict)
test_reader = reader(test_datas, batch_size=100, is_random=False)
accs = []
for data in test_reader:
signal, label = [to_variable(_) for _ in data]
_, acc = model(signal, label)
accs.append(acc.numpy())
avg_acc = np.array(accs).mean()
print(f'test acc: {avg_acc}')
```
### 8. 用训练好的模型识别语音
```python
import numpy as np
import webrtcvad
import paddle.fluid as fluid
from paddle.fluid.dygraph import to_variable, load_dygraph
def vad(file_path, mode=3):
samp_rate, signal_data = wav.read(file_path)
vad = webrtcvad.Vad(mode=mode)
signal = np.pad(signal_data, (0, 160 - (signal_data.shape[0] % int(samp_rate * 0.02))), 'constant')
lens = signal.shape[0]
signals = np.split(signal, lens // int(samp_rate * 0.02))
audio = []
audios = []
for signal_item in signals:
if vad.is_speech(signal_item.tobytes(), samp_rate):
audio.append(signal_item)
elif len(audio) > 0 and not vad.is_speech(signal_item.tobytes(), samp_rate):
audios.append(np.concatenate(audio, 0))
audio = []
return audios, samp_rate
audios, samp_rate = vad('data/audio.wav')
features = []
for audio in audios:
feature = get_mfcc(audio, samp_rate)
features.append(feature)
features = np.concatenate(features, 0).astype('float32')
with fluid.dygraph.guard(place=fluid.CPUPlace()):
model = Audio()
params_dict, _ = load_dygraph('final_model')
model.set_dict(params_dict)
model.eval()
features = to_variable(features)
out = model(features)
result = ' '.join([str(num) for num in np.argmax(out.numpy(), 1).tolist()])
print(f'语音数字的识别结果是:{result}')
```