SSOM.py 5.55 KB
import tensorflow as tf
import numpy as np
from .SOM import SOM, load_SOM_base
from .tf_util import tf_base
from .util import check_dir

########################
# Supervised SOM layer #
########################

def init_SLSOM(path,som):
	parameter = {}
	with open(path+"parameters.txt") as f:
		for line in f:
			splitted = line.split("\t")
			parameter[splitted[0]] = splitted[1]
	tmp = SLSOM(som=som,
		nb_label=int(parameter["nb_label"])
		)
	return tmp

class SLSOM(object):
	def __init__(self,som,nb_label,alpha = 0.5,verbose=True):
		self.tf_object = som.tf_object
		self.ulen = som.ulen
		self.nb_label = nb_label
		self.som = som
		self.alpha = alpha
		self.verbose = verbose
		with self.tf_object.graph.as_default():
			self.W = tf.Variable(tf.random_normal([self.ulen,self.nb_label],dtype=tf.float64))
			self.W_loader = tf.placeholder(tf.float64,shape=[self.ulen,self.nb_label])
			self.load_W = self.W.assign(self.W_loader)
			
			self.biases = tf.Variable(tf.random_normal([self.nb_label],dtype=tf.float64))
			self.biases_loader = tf.placeholder(tf.float64,shape=[self.nb_label])
			self.load_biases = self.biases.assign(self.biases_loader)
			self.it_max = tf.placeholder(tf.int32)
			self.it = tf.Variable(0,dtype=tf.int32)
			self.update_it = self.it.assign_add(1)
			self.data = self.som.sim2units(self.som.data2pred)
#			self.data /= tf.sqrt(tf.reduce_sum(tf.pow(self.data,2.0),axis=1,keepdims=True))
			self.data_size = tf.placeholder(tf.int32,shape=[1])
			self.lambda_penality = tf.placeholder(tf.float64,shape=[1])
			self.max_it = tf.placeholder(tf.int32,shape=[1])
			self.labels = tf.placeholder(tf.int32,shape=[None])
			self.train_op = self.minimize_loss()
			self.proba_data_op = self.proba_class_op()
			self.prediction = self.prediction_op(self.proba_data_op)
			self.update_it_som = self.som.it.assign_add(1)
	
	def learning_rate(self,it):
		tmp = 1.0-tf.cast(self.it,tf.float64)/(tf.cast(self.it_max,tf.float64))
		tmp = tf.where(tf.is_nan(tmp),tf.zeros_like(tmp),tmp)
		return tmp
	
	def save(self,path):
		W = self.get_W()
		biases = self.get_biases()
		np.savetxt(path+"W.txt",np.array(W))
		np.savetxt(path+"biases.txt",np.array(biases))
		check_dir(path)
		with open(path+"parameters.txt","w") as f:
			towrite = ""
			towrite += "nb_label\t"+str(self.nb_label)+"\n"
			f.write(towrite)
	
	def load(self,path):
		W = np.loadtxt(path+"W.txt")
		biases = np.loadtxt(path+"biases.txt")
		self.tf_object.run([self.load_W,self.load_biases],feed_dict={self.W_loader: W, self.biases_loader: biases})
	
	def minimize_loss(self):
		x = self.data
		dist = self.som.dist2units(self.som.data2pred)
		bmus = tf.argmin(dist,1)
		dist_bmu = self.som.dist_bmus_op(bmus)
		neighbour = self.som.R(tf.sqrt(tf.reduce_sum(tf.pow(
				tf.cast(tf.expand_dims(self.som.units_position,1) - tf.expand_dims(self.som.units_position,0),tf.float64)
				,2.0),2)),self.alpha*tf.cast(self.learning_rate(self.it),tf.float64)*max(self.som.dim)/2.0)
		
		neighbour /= tf.reduce_sum(neighbour,axis=1,keepdims=True)
		x = tf.reduce_sum(tf.expand_dims(neighbour,0) * tf.expand_dims(x,1),2)
		
		y = tf.matmul(
				x,
				self.W
			) + self.biases
		
		y_ = tf.one_hot(
				self.labels,
				self.nb_label,
				dtype=tf.float64
			)
		
		self.loss = tf.reduce_mean(tf.losses.sigmoid_cross_entropy(y_,y))
		regularizer = tf.contrib.layers.l2_regularizer(self.lambda_penality)
		penality = regularizer(self.W)
		
		optimizer = tf.train.MomentumOptimizer(0.05,0.25)
		
		loss2 = tf.add(self.loss,penality)
		applied = optimizer.minimize(loss2,var_list=[self.W,self.biases,self.som.units])
		return applied
	
	def proba_class_op(self):
		x = self.data
		dist = self.som.dist2units(self.som.data2pred)
		bmus = tf.argmin(dist,1)
		dist_bmu = self.som.dist_bmus_op(bmus)
		y = tf.matmul(
				x,
				self.W
			) + self.biases
		return tf.nn.sigmoid(y)
	
	def prediction_op(self,y):
		return tf.argmax(y,1)
	
	def get_W(self):
		return self.tf_object.run(self.W)
	
	def get_biases(self):
		return self.tf_object.run(self.biases)
	
	def train(self,data,labels,nb_it=10000,batch_size=None, penality=0.001):
		nb_data = data.shape[0]
		pen = np.array([penality])
		it = 0
		training_size = batch_size if not batch_size is None else data.shape[0]
		loss_old = 0.0
		while it < nb_it:
			if not batch_size is None:
				idx = np.concatenate([np.random.choice(np.flatnonzero(labels == c),replace=False,size=[batch_size]) for c in range(self.nb_label)])
				data2 = data[idx,:]
				labels2 = labels[idx]
				loss_old = self.tf_object.run(self.loss,
				feed_dict={
					self.som.data2pred:data2,
					self.labels:labels2,
					self.lambda_penality:pen,
					self.it_max:nb_it
				})
			else:
				data2 = data
				labels2 = labels
			
			self.tf_object.run(self.train_op,
				feed_dict={
					self.som.data2pred:data2,
					self.labels:labels2,
					self.lambda_penality:pen,
					self.it_max:nb_it
			})
			loss = self.tf_object.run(self.loss,
				feed_dict={
					self.som.data2pred:data2,
					self.labels:labels2,
					self.lambda_penality:pen,
					self.it_max:nb_it
			})
			
			delta_loss = np.absolute(loss-loss_old)
			bool_loss = delta_loss < np.power(10,-6.0)
			if self.verbose:
				print("It SLSOM: "+str(it))
				print("Loss delta: ", delta_loss)
			if bool_loss:
				break
			else:
				loss_old = loss
			it = self.tf_object.run(self.update_it)
		print("Weights learned")
	
	def predict(self,data):
		pred,proba = self.tf_object.run([self.prediction,self.proba_data_op],
			feed_dict={
				self.som.data2pred:data
			})
		return (pred,proba)
	
	def proba_data(self,data):
		prob = self.tf_object.run(self.proba_data_op,
			feed_dict={
				self.som.data2pred:data
			})
		return prob