SSOM.py 4.86 KB
import tensorflow as tf
import numpy as np
from .SOM import SOM, load_SOM_base
from .tf_util import tf_base
from .util import check_dir

########################
# Supervised SOM layer #
########################

def init_SLSOM(path,som):
	parameter = {}
	with open(path+"parameters.txt") as f:
		for line in f:
			splitted = line.split("\t")
			parameter[splitted[0]] = splitted[1]
	tmp = SLSOM(som=som,
		nb_label=int(parameter["nb_label"]),
		loss_type = parameter["loss_type"]
		)
	return tmp

class SLSOM(object):
	def __init__(self,som,nb_label,loss_type='cross_entropy',verbose=True):
		self.tf_object = som.tf_object
		self.ulen = som.ulen
		self.nb_label = nb_label
		self.som = som
		self.loss_type = loss_type
		self.verbose = verbose
		with self.tf_object.graph.as_default():
			self.W = tf.Variable(tf.random_normal([self.ulen,self.nb_label],dtype=tf.float64))
			self.W_loader = tf.placeholder(tf.float64,shape=[self.ulen,self.nb_label])
			self.load_W = self.W.assign(self.W_loader)
			
			self.biases = tf.Variable(tf.random_normal([self.nb_label],dtype=tf.float64))
			self.biases_loader = tf.placeholder(tf.float64,shape=[self.nb_label])
			self.load_biases = self.biases.assign(self.biases_loader)
			self.it_max = tf.placeholder(tf.int32)
			self.it = tf.Variable(0,dtype=tf.int32)
			self.update_it = self.it.assign_add(1)
			self.data = self.som.sim2units(self.som.data2pred)
			

			self.datapred = tf.one_hot(
				self.som.bmu_finder(self.som.data2pred,self.som.units),
				self.som.ulen,
				dtype=tf.float64
			)
			
			self.data_size = tf.placeholder(tf.int32,shape=[1])
			self.lambda_penality = tf.placeholder(tf.float64,shape=[1])
			self.max_it = tf.placeholder(tf.int32,shape=[1])
			self.labels = tf.placeholder(tf.int32,shape=[None])
			self.train_op = self.minimize_loss()
			self.proba_data_op = self.proba_class_op()
			self.prediction = self.prediction_op(self.proba_data_op)
			self.update_it_som = self.som.it.assign_add(1)
	
	def learning_rate(self,it):
		return 1.0-tf.cast(self.it,tf.float64)/(tf.cast(self.it_max,tf.float64))
	
	def save(self,path):
		W = self.get_W()
		biases = self.get_biases()
		np.savetxt(path+"W.txt",np.array(W))
		np.savetxt(path+"biases.txt",np.array(biases))
		check_dir(path)
		with open(path+"parameters.txt","w") as f:
			towrite = ""
			towrite += "nb_label\t"+str(self.nb_label)+"\n"
			towrite += "k\t"+str(self.k)+"\n"
			towrite += "loss_type\t"+str(self.loss_type)+"\n"
			f.write(towrite)
	
	def load(self,path):
		W = np.loadtxt(path+"W.txt")
		biases = np.loadtxt(path+"biases.txt")
		self.tf_object.run([self.load_W,self.load_biases],feed_dict={self.W_loader: W, self.biases_loader: biases})
	
	def minimize_loss(self):
		x = self.data
		dist = self.som.dist2units(self.som.data2pred)
		bmus = tf.argmin(dist,1)
		dist_bmu = self.som.dist_bmus_op(bmus)
		neighbour = self.som.R(dist_bmu,tf.cast(self.learning_rate(self.it),tf.float64)*max(self.som.dim)/2.0)
		x = x*tf.transpose(neighbour)
		
		y = tf.matmul(
				x,
				self.W
			) + self.biases
		
		y_ = tf.one_hot(
				self.labels,
				self.nb_label,
				dtype=tf.float64
			)
		if self.loss_type == 'cross_entropy':
			loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y, labels=self.labels))
		else :
			loss = 0.5*tf.reduce_mean(tf.pow(tf.nn.softmax(y) - y_,2.0))
		regularizer = tf.contrib.layers.l2_regularizer(self.lambda_penality)
		penality = regularizer(self.W)
		
		optimizer = tf.train.GradientDescentOptimizer(0.3*self.learning_rate(self.it))
		optimizer2 = tf.train.GradientDescentOptimizer(self.learning_rate(self.it))
		
		loss2 = tf.add(loss,penality)
		applied = optimizer.minimize(loss2,var_list=[self.W,self.biases])
		applied2 = optimizer2.minimize(loss2,var_list=[self.som.units])
		return (applied,applied2)
	
	def proba_class_op(self):
		x = self.datapred
		y = tf.matmul(
				x,
				self.W
			) + self.biases
		return tf.nn.softmax(y)
	
	def prediction_op(self,y):
		return tf.argmax(y,1)
	
	def get_W(self):
		return self.tf_object.run(self.W)
	
	def get_biases(self):
		return self.tf_object.run(self.biases)
	
	def train(self,data,labels,max_it=2000,batch_size=10, penality=0.001):
		it = np.array([max_it])
		nb_data = data.shape[0]
		pen = np.array([penality])
		data2 = data
		for i in range(max_it):
			if self.verbose:
				print("It SLSOM: "+str(i))
			idx = np.random.randint(nb_data,size=batch_size)
			self.tf_object.run(self.train_op,
				feed_dict={
					self.som.data2pred:data2[idx,:],
					self.data_size:np.array([batch_size]),
					self.labels:labels[idx],
					self.lambda_penality:pen,
					self.it_max:max_it
					})
			self.tf_object.run(self.update_it)
	
	def predict(self,data):
		pred,proba = self.tf_object.run([self.prediction,self.proba_data_op],
			feed_dict={
				self.som.data2pred:data
			})
		return (pred,proba)
	
	def proba_data(self,data):
		prob = self.tf_object.run(self.proba_data_op,
			feed_dict={
				self.som.data2pred:data
			})
		return prob