SSOM.py 5.72 KB
import tensorflow as tf
import numpy as np
from .SOM import SOM, load_SOM_base
from .tf_util import tf_base
from .util import check_dir

########################
# Supervised SOM layer #
########################

def init_SLSOM(path,som):
	parameter = {}
	with open(path+"parameters.txt") as f:
		for line in f:
			splitted = line.split("\t")
			parameter[splitted[0]] = splitted[1]
	tmp = SLSOM(som=som,
		nb_label=int(parameter["nb_label"]),
		loss_type = parameter["loss_type"]
		)
	return tmp

class SLSOM(object):
	def __init__(self,som,nb_label,loss_type='cross_entropy',alpha0 = 1.0, alpha1 = 0.6,verbose=True):
		self.tf_object = som.tf_object
		self.ulen = som.ulen
		self.nb_label = nb_label
		self.som = som
		self.loss_type = loss_type
		self.alpha0 = alpha0
		self.alpha1 = alpha1
		self.verbose = verbose
		with self.tf_object.graph.as_default():
			self.W = tf.Variable(tf.random_normal([self.ulen,self.nb_label],dtype=tf.float64))
			self.W_loader = tf.placeholder(tf.float64,shape=[self.ulen,self.nb_label])
			self.load_W = self.W.assign(self.W_loader)
			
			self.biases = tf.Variable(tf.random_normal([self.nb_label],dtype=tf.float64))
			self.biases_loader = tf.placeholder(tf.float64,shape=[self.nb_label])
			self.load_biases = self.biases.assign(self.biases_loader)
			self.it_max = tf.placeholder(tf.int32)
			self.it = tf.Variable(0,dtype=tf.int32)
			self.update_it = self.it.assign_add(1)
			self.data = self.som.sim2units(self.som.data2pred)

#			self.datapred = tf.one_hot(
#				self.som.bmu_finder(self.som.data2pred,self.som.units),
#				self.som.ulen,
#				dtype=tf.float64
#			)
			
			self.data_size = tf.placeholder(tf.int32,shape=[1])
			self.lambda_penality = tf.placeholder(tf.float64,shape=[1])
			self.max_it = tf.placeholder(tf.int32,shape=[1])
			self.labels = tf.placeholder(tf.int32,shape=[None])
			self.train_op = self.minimize_loss()
			self.proba_data_op = self.proba_class_op()
			self.prediction = self.prediction_op(self.proba_data_op)
			self.update_it_som = self.som.it.assign_add(1)
	
	def learning_rate(self,it):
		#tmp = 1.0/(tf.cast(self.it,tf.float64)+1.0)
		#return tf.Print(tmp,[tmp],"IT : ")
		tmp = 1.0-tf.cast(self.it,tf.float64)/(tf.cast(self.it_max,tf.float64))
		return tmp
	
	def save(self,path):
		W = self.get_W()
		biases = self.get_biases()
		np.savetxt(path+"W.txt",np.array(W))
		np.savetxt(path+"biases.txt",np.array(biases))
		check_dir(path)
		with open(path+"parameters.txt","w") as f:
			towrite = ""
			towrite += "nb_label\t"+str(self.nb_label)+"\n"
			towrite += "loss_type\t"+str(self.loss_type)+"\n"
			f.write(towrite)
	
	def load(self,path):
		W = np.loadtxt(path+"W.txt")
		biases = np.loadtxt(path+"biases.txt")
		self.tf_object.run([self.load_W,self.load_biases],feed_dict={self.W_loader: W, self.biases_loader: biases})
	
	def minimize_loss(self):
		x = self.data
		dist = self.som.dist2units(self.som.data2pred)
		bmus = tf.argmin(dist,1)
		dist_bmu = self.som.dist_bmus_op(bmus)
		neighbour = self.som.R(dist_bmu,(self.alpha1 + (self.alpha0 - self.alpha1)*tf.cast(self.learning_rate(self.it),tf.float64))*max(self.som.dim))
		x = x*tf.transpose(neighbour)
		
		y = tf.matmul(
				x,
				self.W
			) + self.biases
			
		if self.loss_type == 'cross_entropy':
			self.loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=self.labels))
		else :
			y_ = tf.one_hot(
				self.labels,
				self.nb_label,
				dtype=tf.float64
			)
			self.loss = 0.5*tf.reduce_mean(tf.pow(tf.nn.softmax(y) - y_,2.0))
		regularizer = tf.contrib.layers.l2_regularizer(self.lambda_penality)
		penality = regularizer(self.W)
		
		
		optimizer = tf.train.GradientDescentOptimizer(0.1*self.learning_rate(self.it))
		#optimizer2 = tf.train.GradientDescentOptimizer(0.3*self.learning_rate(self.it))
		
		loss2 = tf.add(self.loss,penality)
		applied = optimizer.minimize(loss2,var_list=[self.W,self.biases,self.som.units])
		#applied2 = optimizer2.minimize(loss2,var_list=[])
		#return (applied,applied2)
		return applied
	
	def proba_class_op(self):
#		x = self.datapred
		x = self.data
		dist = self.som.dist2units(self.som.data2pred)
		bmus = tf.argmin(dist,1)
		dist_bmu = self.som.dist_bmus_op(bmus)
		neighbour = self.som.R(dist_bmu,self.alpha1*max(self.som.dim)/2.0)
		x = x*tf.transpose(neighbour)
		y = tf.matmul(
				x,
				self.W
			) + self.biases
		return tf.nn.softmax(y)
	
	def prediction_op(self,y):
		return tf.argmax(y,1)
	
	def get_W(self):
		return self.tf_object.run(self.W)
	
	def get_biases(self):
		return self.tf_object.run(self.biases)
	
	def train(self,data,labels,nb_it=2000,batch_size=10, penality=0.001):
		nb_data = data.shape[0]
		pen = np.array([penality])
		data2 = data
		loss_old = 0.0
		run = True
		it = 0
		while run:
			if self.verbose:
				print("It SLSOM: "+str(it))
#			idx = np.random.randint(nb_data,size=batch_size)
#			_, loss = self.tf_object.run([self.train_op,self.loss],
#				feed_dict={
#					self.som.data2pred:data2[idx,:],
#					self.data_size:np.array([batch_size]),
#					self.labels:labels[idx],
#					self.lambda_penality:pen,
#					self.it_max:nb_it
#					})
			_, loss = self.tf_object.run([self.train_op,self.loss],
				feed_dict={
					self.som.data2pred:data,
					self.labels:labels,
					self.lambda_penality:pen,
					self.it_max:nb_it
					})
			delta_loss = np.absolute(loss - loss_old)
			if self.verbose:
				print("Diff loss: "+str(delta_loss))
			if delta_loss < np.power(10.0,-6.0) or not it < nb_it:
				run = False
			it = self.tf_object.run(self.update_it)
			loss_old = loss
	
	def predict(self,data):
		pred,proba = self.tf_object.run([self.prediction,self.proba_data_op],
			feed_dict={
				self.som.data2pred:data
			})
		return (pred,proba)
	
	def proba_data(self,data):
		prob = self.tf_object.run(self.proba_data_op,
			feed_dict={
				self.som.data2pred:data
			})
		return prob