First, import libraries
from __future__ import unicode_literals, print_function, division
from io import open
import random
import os
import numpy as np
import time
import math
import glob
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import torch
import torch.nn as nn
from torch import optim
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F
from torchsummary import summary
This is the attnention model with encoder+decoder network
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
#self.batch_size = batch_size
self.gru = nn.GRU(input_size, hidden_size)
def forward(self, input, hidden):
#input : (1,bs,4)
#hidden : (1,bs,hs)
output, hidden = self.gru(input, hidden)
return output, hidden #output : (1,bs,hs) , hidden : (1,bs,hs)
#def initHidden(self):
# return torch.zeros(1, self.batch_size, self.hidden_size, device=device)
class AttnDecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size, dropout_p, decoder_time_length) :
super(AttnDecoderRNN, self).__init__()
self.hidden_size = hidden_size
self.output_size = output_size
self.dropout_p = dropout_p
self.decoder_time_length = decoder_time_length
self.attn = nn.Linear(self.hidden_size * 2, self.decoder_time_length)
self.attn_combine = nn.Linear(self.hidden_size * 2, self.hidden_size)
self.embedding = nn.Linear(self.output_size, self.hidden_size)
self.dropout = nn.Dropout(self.dropout_p)
self.gru = nn.GRU(self.hidden_size, self.hidden_size)
self.out = nn.Linear(self.hidden_size, self.output_size)
def forward(self, input, hidden, encoder_outputs):
#input : (outputsize) / (1,bs,os)
#hidden : (1,1,hs) / (1,bs,hs)
#encoder_outputs : (1,1,hs) / (time_length=4,bs,hs)
embedded = self.embedding(input) #embedded : (1,1,hs) / (1,bs,hs)
embedded = self.dropout(embedded)
attn_weights = F.softmax(self.attn(torch.cat((embedded[0], hidden[0]), 1)), dim=1)
# embedded[0] : (1,hs) / (bs,hs) #hidden[0] : (1,hs) / (bs,hs) #torch.cat() : (1,2*hs) / (bs,hs)
# softmax's dim =1 means applying soft max over "2*hs"
# attn_weights : (1,decoder_time_length=4) / (bs,decoder_time_length=4)
attn_applied = torch.bmm(attn_weights.unsqueeze(1),encoder_outputs.permute(1,0,2))
# attn_weights.unsqueeze(0) : (1,decoder_time_length) -> (1,1,decoder_time_length) / (bs,decoder_time_length) -> (bs,1,decoder_time_length)
# encoder_outputs.unsqueeze(1) : (time_length=4,hs) -> (1,time_length=4,hs) / (time_length=4,bs,hs) -> (bs,time_length=4,hs)
# attn_applied : (bs,1,hs)
output = torch.cat((embedded[0], attn_applied.squeeze(1)), 1)
# output : (1,2*hs) / (bs,2*hs)
output = self.attn_combine(output).unsqueeze(0)
# output : (1,1,hs) / (1,bs,hs)
output = F.relu(output) # output : (1,1,hs) / (1,bs,hs)
output, hidden = self.gru(output, hidden)
# output : (1,bs,hs) , hidden: (1,bs,hs)
output = self.out(output[0])
# output : (bs,os)
return output, hidden, attn_weights
def initHidden(self):
return torch.zeros(1, 1, self.hidden_size, device=device)
Train and test functions are defined as below:
trainIters()
: train the network using input_tensor
(EMG data) and output_tensor
(finger angle data). Adam
optimizer and MSELoss
function is used for training the model.
train()
: train the network inside the function trainIters()
test()
: test the network that in the middle of training or after the training.First define function train()
def train(input_tensor, target_tensor, time_length, encoder, decoder_thumb, decoder_index,decoder_middle,
decoder_ring,decoder_pinky,encoder_optimizer, decoder_thumb_optimizer, decoder_index_optimizer,
decoder_middle_optimizer,decoder_ring_optimizer,decoder_pinky_optimizer, criterion,tfr_prob_list,iter):
#input_tensor : (time_length=1,1,bs,4)
#target_tensor : (time_length=1,1,bs,14)
assert input_tensor.shape[2] == input_tensor.shape[2] #batchsize
assert input_tensor.shape[0] == 1
assert target_tensor.shape[0] == 1
new_input_tensor = torch.zeros(input_tensor.shape[3],input_tensor.shape[1],input_tensor.shape[2],input_tensor.shape[3], device=device)
new_target_tensor = torch.zeros(target_tensor.shape[3], target_tensor.shape[1], target_tensor.shape[2],target_tensor.shape[3], device=device)
#should convert (1,1,bs,4) to (4,1,bs,4) as one hot vector
for idx in range(input_tensor.shape[3]) :
new_input_tensor[idx,:,:,idx] = input_tensor[0,0,:,idx]
for idx in range(target_tensor.shape[3]):
new_target_tensor[idx,:,:,idx] = target_tensor[0,0,:,idx]
del input_tensor
del target_tensor
# now feed encoder new_input_tensor[0,:,:,:] ~ new_input_tensor[3,:,:,:] to encoder
# new_input_tensor[idx] has same size with input_tensor[idx] as [1,bs,4]
encoder_hidden = torch.zeros(1, new_input_tensor.shape[2], encoder.hidden_size, device=device)#encoder.initHidden()
encoder_optimizer.zero_grad()
decoder_thumb_optimizer.zero_grad()
decoder_index_optimizer.zero_grad()
decoder_middle_optimizer.zero_grad()
decoder_ring_optimizer.zero_grad()
decoder_pinky_optimizer.zero_grad()
encoder_outputs = torch.zeros(N_emgsensor, new_input_tensor.shape[2],encoder.hidden_size, device=device)
loss_total,loss_thumb,loss_index,loss_middle,loss_ring,loss_pinky = 0,0,0,0,0,0
for ei in range(N_emgsensor):
encoder_output, encoder_hidden = encoder(new_input_tensor[ei], encoder_hidden)
encoder_outputs[ei] = encoder_output[0]
# new_input_tensor = torch.Size([length,4]) / (time_length=N_emgsensor=4,1,bs,4)
# input_tensor[ei] = torch.Size([4]) / (1,bs,4)
# encoder_hidden = torch.Size([1,1,hs]) / (1,bs,hs)
# encoder_output = torch.Size([1,1,hs]) / (1,bs,hs)
# encoder_outputs = torch.Size([length,hs]) / (time_length=N_emgsensor=4,bs,hs)
decoder_input_thumb = torch.zeros(1, new_input_tensor.shape[2], 2, device=device)
decoder_input_index = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_middle = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_ring = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_pinky = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_hidden_thumb = encoder_hidden
decoder_hidden_index = encoder_hidden
decoder_hidden_middle = encoder_hidden
decoder_hidden_ring = encoder_hidden
decoder_hidden_pinky = encoder_hidden
#decoder_hidden = torch.Size([1, bs, hs])
prob = tfr_prob_list[iter]
use_teacher_forcing = True if prob < teacher_forcing_ratio else False
if use_teacher_forcing:
# Teacher forcing: Feed the target as the next input
for di in range(0,2):
decoder_output_thumb, decoder_hidden_thumb, decoder_attention_thumb = decoder_thumb(decoder_input_thumb, decoder_hidden_thumb,
encoder_outputs)
# decoder_input : (1,bs,os)
# decoder_hidden : (1,bs,hs)
# decoder_output : torch.Size([1,angle_num = 14]) / (bs,os)
# decoder_hidden : torch.Size([1,1,hs]) / (1,bs,hs)
# decoder_attention : torch.Size([1,max_length=10])
loss_thumb += criterion(decoder_output_thumb, new_target_tensor[di,:,:,0:2].squeeze(0))
decoder_input_thumb = new_target_tensor[di,:,:,0:2] # Teacher forcing
for di in range(2,5):
decoder_output_index, decoder_hidden_index, decoder_attention_index = decoder_index(decoder_input_index, decoder_hidden_index,
encoder_outputs)
# decoder_input : (1,bs,os)
# decoder_hidden : (1,bs,hs)
# decoder_output : torch.Size([1,angle_num = 14]) / (bs,os)
# decoder_hidden : torch.Size([1,1,hs]) / (1,bs,hs)
# decoder_attention : torch.Size([1,max_length=10])
loss_index += criterion(decoder_output_index, new_target_tensor[di,:,:,2:5].squeeze(0))
decoder_input_index = new_target_tensor[di,:,:,2:5] # Teacher forcing
for di in range(5,8):
decoder_output_middle, decoder_hidden_middle, decoder_attention_middle = decoder_middle(decoder_input_middle, decoder_hidden_middle,
encoder_outputs)
# decoder_input : (1,bs,os)
# decoder_hidden : (1,bs,hs)
# decoder_output : torch.Size([1,angle_num = 14]) / (bs,os)
# decoder_hidden : torch.Size([1,1,hs]) / (1,bs,hs)
# decoder_attention : torch.Size([1,max_length=10])
loss_middle += criterion(decoder_output_middle, new_target_tensor[di,:,:,5:8].squeeze(0))
decoder_input_middle = new_target_tensor[di,:,:,5:8] # Teacher forcing
for di in range(8,11):
decoder_output_ring, decoder_hidden_ring, decoder_attention_ring = decoder_ring(decoder_input_ring, decoder_hidden_ring,
encoder_outputs)
# decoder_input : (1,bs,os)
# decoder_hidden : (1,bs,hs)
# decoder_output : torch.Size([1,angle_num = 14]) / (bs,os)
# decoder_hidden : torch.Size([1,1,hs]) / (1,bs,hs)
# decoder_attention : torch.Size([1,max_length=10])
loss_ring += criterion(decoder_output_ring, new_target_tensor[di,:,:,8:11].squeeze(0))
decoder_input_ring = new_target_tensor[di,:,:,8:11] # Teacher forcing
for di in range(11,14):
decoder_output_pinky, decoder_hidden_pinky, decoder_attention_pinky = decoder_pinky(decoder_input_pinky, decoder_hidden_pinky,
encoder_outputs)
# decoder_input : (1,bs,os)
# decoder_hidden : (1,bs,hs)
# decoder_output : torch.Size([1,angle_num = 14]) / (bs,os)
# decoder_hidden : torch.Size([1,1,hs]) / (1,bs,hs)
# decoder_attention : torch.Size([1,max_length=10])
loss_pinky += criterion(decoder_output_pinky, new_target_tensor[di,:,:,11:14].squeeze(0))
decoder_input_pinky = new_target_tensor[di,:,:,11:14] # Teacher forcing
else :
# Without teacher forcing: use its own predictions as the next input
for di in range(0, 2):
decoder_output_thumb, decoder_hidden_thumb, decoder_attention_thumb = decoder_thumb(decoder_input_thumb,
decoder_hidden_thumb,
encoder_outputs)
loss_thumb += criterion(decoder_output_thumb, new_target_tensor[di,:,:,0:2].squeeze(0))
decoder_input_thumb = decoder_output_thumb.unsqueeze(0)
for di in range(2, 5):
decoder_output_index, decoder_hidden_index, decoder_attention_index = decoder_index(decoder_input_index,
decoder_hidden_index,
encoder_outputs)
loss_index += criterion(decoder_output_index, new_target_tensor[di,:,:,2:5].squeeze(0))
decoder_input_index = decoder_output_index.unsqueeze(0)
for di in range(5, 8):
decoder_output_middle, decoder_hidden_middle, decoder_attention_middle = decoder_middle(
decoder_input_middle, decoder_hidden_middle,
encoder_outputs)
loss_middle += criterion(decoder_output_middle, new_target_tensor[di,:,:,5:8].squeeze(0))
decoder_input_middle = decoder_output_middle.unsqueeze(0)
for di in range(8, 11):
decoder_output_ring, decoder_hidden_ring, decoder_attention_ring = decoder_ring(decoder_input_ring,
decoder_hidden_ring,
encoder_outputs)
loss_ring += criterion(decoder_output_ring, new_target_tensor[di,:,:,8:11].squeeze(0))
decoder_input_ring = decoder_output_ring.unsqueeze(0)
for di in range(11, 14):
decoder_output_pinky, decoder_hidden_pinky, decoder_attention_pinky = decoder_pinky(decoder_input_pinky,
decoder_hidden_pinky,
encoder_outputs)
loss_pinky += criterion(decoder_output_pinky, new_target_tensor[di,:,:,11:14].squeeze(0))
decoder_input_pinky = decoder_output_pinky.unsqueeze(0)
loss_total = loss_thumb + loss_index + loss_middle + loss_ring + loss_pinky
loss_total.backward()
encoder_optimizer.step()
decoder_thumb_optimizer.step()
decoder_index_optimizer.step()
decoder_middle_optimizer.step()
decoder_ring_optimizer.step()
decoder_pinky_optimizer.step()
return loss_total.item() / 14 , loss_thumb/2 , loss_index /3 , loss_middle /3 , loss_ring /3 , loss_pinky /3
and define function trainIters()
def trainIters(input_data,output_data,input_data_eval,output_data_eval,time_length, encoder, decoder_thumb,
decoder_index,decoder_middle,decoder_ring,decoder_pinky, n_epochs, eval_every, test_every,
learning_rate_encoder,learning_rate_decoder,batch_size):
#input_data : (4,data_length)
#output_data : (14,data_length)
#time_length = 19
best_test_mse = 100
best_eval_mse = 100
from scipy.signal import savgol_filter
test_input_data, test_output_data,_,_ = dataprepare(test_path, test=True)
# test_output_data convert 0 <-> 1
test_output_data = 1-test_output_data
start = time.time()
loss_total, loss1_total, loss2_total, loss3_total, loss4_total, loss5_total = 0, 0, 0, 0, 0, 0
encoder_optimizer = optim.Adam(encoder.parameters(), lr=learning_rate_encoder)
decoder_thumb_optimizer = optim.Adam(decoder_thumb.parameters(), lr=learning_rate_decoder)
decoder_index_optimizer = optim.Adam(decoder_index.parameters(), lr=learning_rate_decoder)
decoder_middle_optimizer = optim.Adam(decoder_middle.parameters(), lr=learning_rate_decoder)
decoder_ring_optimizer = optim.Adam(decoder_ring.parameters(), lr=learning_rate_decoder)
decoder_pinky_optimizer = optim.Adam(decoder_pinky.parameters(), lr=learning_rate_decoder)
#optimizer = torch.optim.Adam(list(encdoer.parameters()+list(decoder_thumb_parameters()+\\
# list(decoder_index_parameters()+list(decoder_middle_parameters()+list(decoder_ring_parameters()+list(decoder_pinky_parameters())
criterion = nn.MSELoss()
for epoch in range(1, n_epochs + 1):
print('========== epoch : %d =========='% (epoch))
randomindex = [x for x in range(input_data.shape[1]-time_length)]
random.Random(epoch).shuffle(randomindex)
num_iters = (input_data.shape[1]-time_length)//batch_size
tfr_prob_list = np.random.random(num_iters)
for iter in range(num_iters):
input_tensor, target_tensor = dataloader(iter, time_length, input_data, output_data, randomindex,batchsize=batch_size)
np.random.seed(epoch)
loss_5finger,loss1,loss2,loss3,loss4,loss5 = train(input_tensor, target_tensor, time_length, encoder,decoder_thumb, decoder_index,decoder_middle,
decoder_ring,decoder_pinky,encoder_optimizer, decoder_thumb_optimizer, decoder_index_optimizer,
decoder_middle_optimizer,decoder_ring_optimizer,decoder_pinky_optimizer,criterion, tfr_prob_list,iter)
writer.add_scalar('Loss/iter',loss_5finger,(epoch-1)*num_iters + iter)
loss_total += loss_5finger
loss1_total += loss1
loss2_total += loss2
loss3_total += loss3
loss4_total += loss4
loss5_total += loss5
if iter % int(0.3*(input_data.shape[1]-time_length)//batch_size) == 0 :
print('iter : %d , loss_5finger : %.5f' % (iter, loss_5finger))
loss_avg = loss_total / num_iters
loss1_avg = loss1_total / num_iters
loss2_avg = loss2_total / num_iters
loss3_avg = loss3_total / num_iters
loss4_avg = loss4_total / num_iters
loss5_avg = loss5_total / num_iters
writer.add_scalar('Loss_total/epoch', loss_avg, epoch)
writer.add_scalar('Loss_thumb/epoch', loss1_avg, epoch)
writer.add_scalar('Loss_index/epoch', loss2_avg, epoch)
writer.add_scalar('Loss_middle/epoch', loss3_avg, epoch)
writer.add_scalar('Loss_ring/epoch', loss4_avg, epoch)
writer.add_scalar('Loss_pinky/epoch', loss5_avg, epoch)
loss_total, loss1_total, loss2_total, loss3_total, loss4_total, loss5_total = 0, 0, 0, 0, 0, 0
print('%s (%d %d%%) loss_avg : %.9f' % (timeSince(start, epoch / n_epochs),
epoch, epoch / n_epochs * 100, loss_avg))
if epoch % eval_every == 0 :
eval_pred_target, eval_loss_avg, eval_loss1_avg, eval_loss2_avg, eval_loss3_avg, eval_loss4_avg, eval_loss5_avg, eval_attention_scores \\
= test(input_data_eval, output_data_eval, time_length, encoder, decoder_thumb, decoder_index,
decoder_middle,decoder_ring,decoder_pinky)
np.save(save_path + name + '/epoch_'+str(epoch)+'_eval_attention_scores.npy', eval_attention_scores.cpu().numpy())
eval_mse = gettestACC(eval_pred_target,output_data_eval)
print("current eval mse : %.3f" % (eval_mse))
print("best eval mse : %.3f" % (best_eval_mse))
writer.add_scalar('EvalAcc/epoch', eval_mse, epoch)
writer.add_scalar('bestEvalAcc/epoch', best_eval_mse, epoch)
print('=======================================')
writer.add_scalar('Loss_total/eval', eval_loss_avg, epoch)
writer.add_scalar('Loss_thumb/eval', eval_loss1_avg, epoch)
writer.add_scalar('Loss_index/eval', eval_loss2_avg, epoch)
writer.add_scalar('Loss_middle/eval', eval_loss3_avg, epoch)
writer.add_scalar('Loss_ring/eval', eval_loss4_avg, epoch)
writer.add_scalar('Loss_pinky/eval', eval_loss5_avg, epoch)
if eval_mse < best_eval_mse :
best_eval_mse = eval_mse
print("new eval mse : %.3f" %(best_eval_mse))
print('save eval attention and eval pred angle')
np.save(save_path + name + '/best_eval_pred_target.npy', eval_pred_target)
np.save(save_path + name + '/best_eval_attention_scores.npy', eval_attention_scores.cpu().numpy())
if epoch % test_every == 0 :
test_pred_target, test_loss_avg, test_loss1_avg, test_loss2_avg, test_loss3_avg, test_loss4_avg, test_loss5_avg, test_attention_scores \\
= test(test_input_data, test_output_data, time_length, encoder, decoder_thumb, decoder_index,
decoder_middle, decoder_ring, decoder_pinky)
test_mse = gettestACC(test_pred_target,test_output_data)
print("current test mse : %.3f" %(test_mse))
print("best test mse : %.3f" % (best_test_mse))
writer.add_scalar('TestAcc/epoch', test_mse, epoch)
writer.add_scalar('bestTestAcc/epoch', best_test_mse, epoch)
print('=======================================')
if test_mse < best_test_mse :
best_test_mse = test_mse
print("new test mse : %.3f" %(best_test_mse))
print('savemodel when best test mse!')
torch.save(encoder.state_dict(), model_path + name + '_encoder')
torch.save(decoder_thumb.state_dict(), model_path + name + '_attention_decoder_thumb')
torch.save(decoder_index.state_dict(), model_path + name + '_attention_decoder_index')
torch.save(decoder_middle.state_dict(), model_path + name + '_attention_decoder_middle')
torch.save(decoder_ring.state_dict(), model_path + name + '_attention_decoder_ring')
torch.save(decoder_pinky.state_dict(), model_path + name + '_attention_decoder_pinky')
print('save test attention and test pred angle')
np.save(save_path + name + '/best_test_pred_target.npy', test_pred_target)
np.save(save_path + name + '/best_test_attention_scores.npy', test_attention_scores.cpu().numpy())
and finally define function test()
def test(input_data, output_data, time_length, encoder, decoder_thumb, decoder_index,
decoder_middle, decoder_ring, decoder_pinky):
criterion = nn.MSELoss()
loss_5finger_total, loss1_total, loss2_total, loss3_total, loss4_total, loss5_total = 0,0,0,0,0,0
input_tensor_list, target_tensor_list = testdataloader(time_length,input_data,output_data)
predict_target_tensor = np.zeros_like(output_data)
#define attentinoscore
attentionscores = torch.zeros(N_emgsensor*14,len(target_tensor_list),device=device)
with torch.no_grad() :
for idx0,(input_tensor , target_tensor) in enumerate(zip(input_tensor_list,target_tensor_list)):
loss_5finger, loss_thumb, loss_index, loss_middle, loss_ring, loss_pinky = 0, 0, 0, 0, 0, 0
assert input_tensor.shape[2] == input_tensor.shape[2] # batchsize
assert input_tensor.shape[0] == 1
assert target_tensor.shape[0] == 1
new_input_tensor = torch.zeros(input_tensor.shape[3], input_tensor.shape[1], input_tensor.shape[2],
input_tensor.shape[3], device=device)
new_target_tensor = torch.zeros(target_tensor.shape[3], target_tensor.shape[1], target_tensor.shape[2],
target_tensor.shape[3], device=device)
# should convert (1,1,bs,4) to (4,1,bs,4) as one hot vector
for idx1 in range(input_tensor.shape[3]):
new_input_tensor[idx1, :, :, idx1] = input_tensor[0, 0, :, idx1]
for idx2 in range(target_tensor.shape[3]):
new_target_tensor[idx2, :, :, idx2] = target_tensor[0, 0, :, idx2]
del input_tensor
del target_tensor
# also define attention score matrix to show its effectiveness
encoder_hidden = torch.zeros(1, new_input_tensor.shape[2], encoder.hidden_size, device=device)#encoder.initHidden()
encoder_outputs = torch.zeros(N_emgsensor, new_input_tensor.shape[2],encoder.hidden_size, device=device)
for ei in range(N_emgsensor):
encoder_output, encoder_hidden = encoder(new_input_tensor[ei], encoder_hidden)
encoder_outputs[ei] = encoder_output[0]
decoder_input_thumb = torch.zeros(1, new_input_tensor.shape[2], 2, device=device)
decoder_input_index = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_middle = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_ring = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_input_pinky = torch.zeros(1, new_input_tensor.shape[2], 3, device=device)
decoder_hidden_thumb = encoder_hidden
decoder_hidden_index = encoder_hidden
decoder_hidden_middle = encoder_hidden
decoder_hidden_ring = encoder_hidden
decoder_hidden_pinky = encoder_hidden
for di in range(0, 2):
decoder_output_thumb, decoder_hidden_thumb, decoder_attention_thumb = decoder_thumb(decoder_input_thumb,
decoder_hidden_thumb,
encoder_outputs)
predict_target_tensor[di, idx0 * time_length] = np.transpose(decoder_output_thumb[0,di].cpu().numpy()).squeeze()
loss_thumb += criterion(decoder_output_thumb, new_target_tensor[di, :, :, 0:2].squeeze(0))
decoder_input_thumb = decoder_output_thumb.unsqueeze(0)
#save attentionscores
attentionscores[N_emgsensor * di:N_emgsensor * (di+1),idx0] = decoder_attention_thumb
for di in range(2, 5):
decoder_output_index, decoder_hidden_index, decoder_attention_index = decoder_index(decoder_input_index,
decoder_hidden_index,
encoder_outputs)
predict_target_tensor[di, idx0 * time_length] = np.transpose(
decoder_output_index[0,di-2].cpu().numpy()).squeeze()
loss_index += criterion(decoder_output_index, new_target_tensor[di, :, :, 2:5].squeeze(0))
decoder_input_index = decoder_output_index.unsqueeze(0)
# save attentionscores
attentionscores[N_emgsensor * di:N_emgsensor * (di+1),idx0] = decoder_attention_index
for di in range(5, 8):
decoder_output_middle, decoder_hidden_middle, decoder_attention_middle = decoder_middle(
decoder_input_middle, decoder_hidden_middle,
encoder_outputs)
predict_target_tensor[di, idx0 * time_length] = np.transpose(
decoder_output_middle[0,di-5].cpu().numpy()).squeeze()
loss_middle += criterion(decoder_output_middle, new_target_tensor[di, :, :, 5:8].squeeze(0))
decoder_input_middle = decoder_output_middle.unsqueeze(0)
# save attentionscores
attentionscores[N_emgsensor * di:N_emgsensor * (di+1),idx0] = decoder_attention_middle
for di in range(8, 11):
decoder_output_ring, decoder_hidden_ring, decoder_attention_ring = decoder_ring(decoder_input_ring,
decoder_hidden_ring,
encoder_outputs)
predict_target_tensor[di, idx0 * time_length] = np.transpose(
decoder_output_ring[0,di-8].cpu().numpy()).squeeze()
loss_ring += criterion(decoder_output_ring, new_target_tensor[di, :, :, 8:11].squeeze(0))
decoder_input_ring = decoder_output_ring.unsqueeze(0)
# save attentionscores
attentionscores[N_emgsensor * di:N_emgsensor * (di+1),idx0] = decoder_attention_ring
for di in range(11, 14):
decoder_output_pinky, decoder_hidden_pinky, decoder_attention_pinky = decoder_pinky(decoder_input_pinky,
decoder_hidden_pinky,
encoder_outputs)
predict_target_tensor[di, idx0 * time_length] = np.transpose(
decoder_output_pinky[0,di-11].cpu().numpy()).squeeze()
loss_pinky += criterion(decoder_output_pinky, new_target_tensor[di, :, :, 11:14].squeeze(0))
decoder_input_pinky = decoder_output_pinky.unsqueeze(0)
# save attentionscores
attentionscores[N_emgsensor * di:N_emgsensor * (di+1),idx0] = decoder_attention_pinky
loss_5finger = loss_thumb + loss_index + loss_middle + loss_ring + loss_pinky
#loss_5finger: total 14 EA loss sum
loss1_total += loss_thumb
loss2_total += loss_index
loss3_total += loss_middle
loss4_total += loss_ring
loss5_total += loss_pinky
loss_5finger_total += loss_5finger
assert len(input_tensor_list) == (idx0 + 1)
loss_avg = loss_5finger_total / ((idx0+1)*14)
loss1_avg = loss1_total / ((idx0+1)*2)
loss2_avg = loss2_total / ((idx0+1)*3)
loss3_avg = loss3_total / ((idx0+1)*3)
loss4_avg = loss4_total / ((idx0+1)*3)
loss5_avg = loss5_total /((idx0+1)*3)
print("eval total loss : %.9f " %(loss_avg))
return predict_target_tensor , loss_avg, loss1_avg, loss2_avg, loss3_avg, loss4_avg, loss5_avg , attentionscores
def mse(y,t) :
return np.sqrt((1/2)*np.mean((y-t)**2))
def gettestACC(y,t) :
sum = 0
for idx in range(14):
sum += mse(y[idx, :], t[idx, :])
return sum/14
import pickle
class Data():
def __init__(self,x_data,y_data):
self.x_data = x_data
self.y_data = y_data
def synctime(inputdata,outputdata,starttime,videofs,emgfs) :
import matplotlib.pyplot as plt
plt.plot(inputdata[:starttime*emgfs,0],inputdata[:starttime*emgfs,1])
plt.plot(outputdata[14,:starttime * videofs], outputdata[0,:starttime * videofs])
plt.show()
def testdataloader(timelength,inputdata,outputdata) :
index = 0
inputTensorList,targetTensorList = [],[]
while index+timelength <= inputdata.shape[1] :
inputTensor=torch.tensor(np.transpose(inputdata[:, index : index+timelength]), #(4,timelength)
dtype=torch.float32, device=device)
inputTensor = torch.unsqueeze(torch.unsqueeze(inputTensor, 1), 1)
targetTensor = torch.tensor(np.transpose(outputdata[:, index : index+timelength]),
dtype=torch.float32, device=device) # (bs,4)
targetTensor = torch.unsqueeze(torch.unsqueeze(targetTensor, 1), 1) # (1,1,bs,14)
inputTensorList.append(inputTensor)
targetTensorList.append(targetTensor)
index = index + timelength
return inputTensorList, targetTensorList
def dataloader(iter,timelength,inputdata,outputdata,randomindex,batchsize) :
#inputdata : (4,data_legnth)
#outdata : (14,data_legnth)
assert inputdata.shape[0] ==4
assert outputdata.shape[0] == 14
assert inputdata.shape[1] == outputdata.shape[1]
input_tensor_group, target_tensor_group = None, None
for idx in range(timelength):
if batchsize * (iter + 1) > input_data.shape[1]-timelength:
indexend = -1
else:
indexend = batchsize * (iter + 1)
input_tensor = torch.tensor(np.transpose(inputdata[:, [x+idx for x in randomindex[batchsize * iter:indexend]]]),
dtype=torch.float32, device=device) #(bs,4)
input_tensor = torch.unsqueeze(torch.unsqueeze(input_tensor, 0), 0) #(1,1,bs,4)
target_tensor = torch.tensor(
np.transpose(outputdata[:, [x + idx for x in randomindex[batchsize * iter:indexend]]]),
dtype=torch.float32, device=device) # (bs,4)
target_tensor = torch.unsqueeze(torch.unsqueeze(target_tensor,0), 0) #(1,1,bs,14)
if idx == 0 :
input_tensor_group = input_tensor
target_tensor_group = target_tensor
else :
input_tensor_group = torch.cat((input_tensor_group,input_tensor),dim=0) #(timelength,1,bs,14)
target_tensor_group = torch.cat((target_tensor_group,target_tensor),dim=0) #(timelength,1,bs,14)
if indexend != - 1:
assert input_tensor.shape[2] == batchsize
return input_tensor_group,target_tensor_group
def dataprepare(datapath,doesEval=False,test = False) :
emglist, anglelist = None , None
for filepath in glob.glob(os.path.join(datapath,'*.pkl')):
print(filepath)
with open(filepath,'rb') as f:
data = pickle.load(f)
if emglist == None and anglelist == None :
emglist = data.x_data
anglelist = data.y_data
else :
emglist.extend(data.x_data)
anglelist.extend(data.y_data)
if test :
print("test on a single experiment data")
break
assert len(emglist) == len(anglelist)
fingertype = [i+1 for i in range(5)]*5*9
random.Random(0).shuffle(emglist)
random.Random(0).shuffle(anglelist)
random.Random(0).shuffle(fingertype)
emgarray,anglearray= None, None
emgarray_eval,anglearray_eval = None,None
for idx,(emg,angle) in enumerate(zip(emglist,anglelist)):
if idx == 0 :
emgarray = emg
anglearray= angle
elif idx >= len(emglist)-5 :
if idx == len(emglist)-5 :
emgarray_eval = emg
anglearray_eval = angle
else :
emgarray_eval = np.concatenate((emgarray_eval, emg), axis=1)
anglearray_eval = np.concatenate((anglearray_eval, angle), axis=1)
else :
emgarray = np.concatenate((emgarray,emg),axis = 1)
anglearray = np.concatenate((anglearray, angle), axis=1)
if not doesEval :
emgarray = np.concatenate((emgarray_eval, emgarray), axis=1)
anglearray = np.concatenate((anglearray_eval, anglearray), axis=1)
emgarray_eval = None
anglearray_eval = None
return emgarray, anglearray ,emgarray_eval , anglearray_eval
def asMinutes(s):
m = math.floor(s / 60)
s -= m * 60
return '%dm %ds' % (m, s)
def timeSince(since, percent):
now = time.time()
s = now - since
es = s / (percent)
rs = es - s
return '%s (- %s)' % (asMinutes(s), asMinutes(rs))
def showPlot(points):
plt.figure()
plt.plot(points)
plt.show()