Skip to content
Snippets Groups Projects
Commit cf1e3dd2 authored by Youssef  NIDABRAHIM's avatar Youssef NIDABRAHIM
Browse files

TP5

parent ca60338a
No related branches found
No related tags found
No related merge requests found
import tensorflow as tf
import os
import numpy as np
import random
import string
import argparse
#Programme principal
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--file', type=str, default='names.txt',help='Fichier d''entrée')
parser.add_argument('--tailleBatch', type=int, default=64, help='taille des batchs')
parser.add_argument('--num_elems', type=int, default=7,help='num_elems nouveaux des batchs')
parser.add_argument('--num_noeuds', type=int, default=64,help='nombre de noeuds cachés LSTM')
args = parser.parse_args()
myRNN(args)
def myRNN(args):
# Lecture du fichier d'apprentissage
fichier = open(args.file, "r")
text = fichier.read()
fichier.close()
'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
Mapping caractères minuscules de l'alphabet <-> idenfifiants (numéro du caractère)
Le fichier est supposé écrit uniquement en minuscules, aucun caractère spécial
'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
taille_vocabulaire = len(string.ascii_lowercase) + 1
lettre1 = ord(string.ascii_lowercase[0])
def char2id(char):
if char in string.ascii_lowercase:
return ord(char) - lettre1 + 1
elif char == ' ':
return 0
def id2char(id):
if id > 0:
return chr(id + lettre1 - 1)
else:
return ' '
'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
Définition de l'ensemble d'apprentissage par batchs : classe GenerateurBatch
'''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''''
tailleBatch=args.tailleBatch
num_elems=args.num_elems
class GenerateurBatch(object):
'''
Un batch est composé d'un tableau de tailleBatch vecteurs numériques de taille taille_vocabulaire
'''
def __init__(self, text, tailleBatch, num_elems):
self._text = text
self._text_size = len(text)
self._tailleBatch = tailleBatch
self._num_elems = num_elems
segment = self._text_size // tailleBatch
self._cursor = [ offset * segment for offset in range(tailleBatch)]
self._last_batch = self._nextBatch()
'''
Génération du batch suivant. Le tableau est composé du dernier vecteur du batch précédent, suivi de num_elems nouveaux vecteurs
'''
def _nextBatch(self):
batch = np.zeros(shape=(self._tailleBatch, taille_vocabulaire), dtype=np.float)
for b in range(self._tailleBatch):
batch[b, char2id(self._text[self._cursor[b]])] = 1.0
self._cursor[b] = (self._cursor[b] + 1) % self._text_size
return batch
def next(self):
batches = [self._last_batch]
for step in range(self._num_elems):
batches.append(self._nextBatch())
self._last_batch = batches[-1]
return batches
# Ensemble de validation
tailleValidation = 1000
valid_text = text[:tailleValidation]
train_text = text[tailleValidation:]
tailleApprentissage = len(train_text)
'''
Calcule, à partir d'une distribution de probabilité sur les caractères, de
la représentation du caractère le plus vraisemblable
'''
def caractere(probabilites):
return [id2char(c) for c in np.argmax(probabilites, 1)]
'''
Convertit une suite de batchs en leur chaine de caractères la plus probable
'''
def batches2string(batches):
s = [''] * batches[0].shape[0]
for b in batches:
s = [''.join(x) for x in zip(s, caractere(b))]
return s
#Définition des batchs d'entraînement et de validation
batchsEntrainement = GenerateurBatch(train_text, tailleBatch, num_elems)
batchsValidation = GenerateurBatch(valid_text, 1, 1)
#log probabilité des vrais labels dans le batch prédit
def logProb(predictions, labels):
precision = 1e-6
predictions[predictions < precision] = precision
return np.sum(np.multiply(labels, -np.log(predictions))) / labels.shape[0]
#Tirage d'un élément à partir d'une distribution
def tirageDistribution(distribution):
r = random.uniform(0, 1)
s = 0
for i in range(len(distribution)):
s += distribution[i]
if s >= r:
return i
return len(distribution) - 1
#Transformation d'une colonne de prédiction en une matrice d'échantillons
def sample(prediction):
p = np.zeros(shape=[1, taille_vocabulaire], dtype=np.float)
p[0, tirageDistribution(prediction[0])] = 1.0
return p
#Génération d'une colonne de probabilités
def alea():
b = np.random.uniform(0.0, 1.0, size=[1, taille_vocabulaire])
return b/np.sum(b, 1)[:,None]
'''
Définition du modèle LSTM
'''
num_noeuds = args.num_noeuds
graph = tf.Graph()
with graph.as_default():
# Paramètres de la porte d'entrée : entrée, sortie récédente, biais
ix = tf.Variable(tf.truncated_normal([taille_vocabulaire, num_noeuds], -0.1, 0.1))
im = tf.Variable(tf.truncated_normal([num_noeuds, num_noeuds], -0.1, 0.1))
ib = tf.Variable(tf.zeros([1, num_noeuds]))
# Paramètres de la porte d'oubli : entrée, sortie récédente, biais
fx = tf.Variable(tf.truncated_normal([taille_vocabulaire, num_noeuds], -0.1, 0.1))
fm = tf.Variable(tf.truncated_normal([num_noeuds, num_noeuds], -0.1, 0.1))
fb = tf.Variable(tf.zeros([1, num_noeuds]))
# Cellule : entrée, état, biais
cx = tf.Variable(tf.truncated_normal([taille_vocabulaire, num_noeuds], -0.1, 0.1))
cm = tf.Variable(tf.truncated_normal([num_noeuds, num_noeuds], -0.1, 0.1))
cb = tf.Variable(tf.zeros([1, num_noeuds]))
# Paramètres de la porte de sortie : entrée, sortie récédente, biais
ox = tf.Variable(tf.truncated_normal([taille_vocabulaire, num_noeuds], -0.1, 0.1))
om = tf.Variable(tf.truncated_normal([num_noeuds, num_noeuds], -0.1, 0.1))
ob = tf.Variable(tf.zeros([1, num_noeuds]))
# Variable de stockage des états lors du dépliage du réseau récurrent
sortieSauvee = tf.Variable(tf.zeros([tailleBatch, num_noeuds]), trainable=False)
etatSauve = tf.Variable(tf.zeros([tailleBatch, num_noeuds]), trainable=False)
#Paramèters du classifieur : poids et biais
w = tf.Variable(tf.truncated_normal([num_noeuds, taille_vocabulaire], -0.1, 0.1))
b = tf.Variable(tf.zeros([taille_vocabulaire]))
#Calcul de la cellule. Calcul des différentes portes, mise à jour de la cellule et de la sortie.
def lstm_cell(i, o, etat):
input_gate = tf.sigmoid(tf.matmul(i, ix) + tf.matmul(o, im) + ib)
forget_gate = tf.sigmoid(tf.matmul(i, fx) + tf.matmul(o, fm) + fb)
update = tf.matmul(i, cx) + tf.matmul(o, cm) + cb
etat = forget_gate * etat + input_gate * tf.tanh(update)
sortie_gate = tf.sigmoid(tf.matmul(i, ox) + tf.matmul(o, om) + ob)
return sortie_gate * tf.tanh(etat), etat
#Données d'entrée
train_data = list()
for _ in range(num_elems + 1):
train_data.append(tf.placeholder(tf.float32, shape=[tailleBatch,taille_vocabulaire]))
train_inputs = train_data[:num_elems]
# le label est le le caractère suivant. On regarde les co occurrences.
train_labels = train_data[1:]
#Dépliage de la boucle LSTM
sorties = list()
sortie = sortieSauvee
etat = etatSauve
for i in train_inputs:
sortie, etat = lstm_cell(i, sortie, etat)
sorties.append(sortie)
#Sauvegarde des états lors du dépliage
with tf.control_dependencies([sortieSauvee.assign(sortie),etatSauve.assign(etat)]):
# Classifieur
logits = tf.nn.xw_plus_b(tf.concat(sorties,0), w, b)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=tf.concat(train_labels,0)))
#Définition de l'algorithme d'optimisation : descente de gradient avec learning rate adapté
global_step = tf.Variable(0)
learning_rate = tf.train.exponential_decay(10.0, global_step, 5000, 0.1, staircase=True)
optimizer = tf.train.GradientDescentOptimizer(learning_rate)
gradients, v = zip(*optimizer.compute_gradients(loss))
gradients, _ = tf.clip_by_global_norm(gradients, 1.25)
optimizer = optimizer.apply_gradients(zip(gradients, v), global_step=global_step)
#Prédiction
train_prediction = tf.nn.softmax(logits)
#Evaluation : batch 1 sans dépliage
sample_input = tf.placeholder(tf.float32, shape=[1, taille_vocabulaire])
saved_sample_sortie = tf.Variable(tf.zeros([1, num_noeuds]))
saved_sample_etat = tf.Variable(tf.zeros([1, num_noeuds]))
reset_sample_etat = tf.group(saved_sample_sortie.assign(tf.zeros([1, num_noeuds])),saved_sample_etat.assign(tf.zeros([1, num_noeuds])))
sample_sortie, sample_etat = lstm_cell(sample_input, saved_sample_sortie, saved_sample_etat)
with tf.control_dependencies([saved_sample_sortie.assign(sample_sortie),saved_sample_etat.assign(sample_etat)]):
sample_prediction = tf.nn.softmax(tf.nn.xw_plus_b(sample_sortie, w, b))
#Session TF
num_epochs = 30000
display = 1000
num_mots = 40
with tf.Session(graph=graph) as session:
init = tf.global_variables_initializer()
session.run(init)
for step in range(num_epochs):
batches = batchsEntrainement.next()
feed_dict = dict()
for i in range(num_elems + 1):
feed_dict[train_data[i]] = batches[i]
_, l, predictions, lr = session.run([optimizer, loss, train_prediction, learning_rate], feed_dict=feed_dict)
if step % display == 0:
labels = np.concatenate(list(batches)[1:])
# Génération d'un échantillon de mots
print('*' * num_mots)
print('***** Echantillon de mots étape ',step)
print ('***** perplexité : %.2f' % float(np.exp(logProb(predictions, labels))))
print('*' * num_mots)
for _ in range(5):
feed = sample(alea())
phrase = caractere(feed)[0]
reset_sample_etat.run()
for _ in range(num_mots-1):
prediction = sample_prediction.eval({sample_input: feed})
feed = sample(prediction)
phrase += caractere(feed)[0]
print(phrase)
print('*' * num_mots)
# Perplexité de l'ensemble de validation
reset_sample_etat.run()
valid_logProb = 0
for _ in range(tailleValidation):
b = batchsValidation.next()
predictions = sample_prediction.eval({sample_input: b[0]})
valid_logProb = valid_logProb + logProb(predictions, b[1])
print('perplexité de l''ensemble de validation: %.2f' % float(np.exp(valid_logProb / tailleValidation)))
if __name__ == '__main__':
main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment