SOM.py 7.6 KB
import numpy as np
import tensorflow as tf
from .tf_util import tf_base
from .util import check_dir

def load_SOM_base(path):
	parameter = {}
	with open(path+"parameters.txt") as f:
		for line in f:
			splitted = line.split("\t")
			parameter[splitted[0]] = splitted[1]
	return parameter

def init_SOM(path):
	para = load_SOM_base(path)
	tmp = SOM(m=int(para["m"]),n=int(para["n"]),unit_width=int(para["unit_width"]))
	return tmp

############
# SOM base #
############

class SOM_basic(object):
	def __init__(self,m,n,unit_width,data_train_op=None,data_test_op=None,tf_object=None,init=True,verbose=False):
		if tf_object is None:
			self.tf_object = tf_base()
		else:
			self.tf_object = tf_object
		self.dim = (m,n)
		self.ulen = m*n
		self.unit_width = unit_width
		self.verbose=verbose
		if init:
			with self.tf_object.graph.as_default():
				self.init_data(data_train_op,data_test_op)
				self.init_op_learn()
				self.prediction = self.prediction_op(self.data2pred)
				self.data_dist = self.dist2units(self.data2pred)
				self.data_sim = self.sim2units(self.data2pred)
				self.bmu = self.bmu_finder(self.data2pred,self.units)
				self.learn = self.learn_op()
				self.summary = tf.summary.merge_all()
	
	def save(self,path):
		check_dir(path)
		units = self.get_units()
		np.savetxt(path+'units.txt',np.array(units))
		with open(path+"parameters.txt","w") as f:
			towrite = ""
			towrite += "m\t"+str(self.dim[0])+"\n"
			towrite += "n\t"+str(self.dim[1])+"\n"
			towrite += "unit_width\t"+str(self.unit_width)+"\n"
			f.write(towrite)
	
	def load(self,path):
		units = np.loadtxt(path+'units.txt')
		self.tf_object.run(self.load_units,feed_dict={self.units_loader:units})
	
	def init_data(self,train,test):
		if train is None:
			self.data = tf.placeholder(tf.float64,shape=[None,self.unit_width])
		else:
			self.data = train
		if test is None:
			self.data2pred = tf.placeholder(tf.float64,shape=[None,self.unit_width])
		else:
			self.data2pred = test
	
	def init_unit(self):
		vals = np.random.rand(self.ulen,self.unit_width)
		vals -= vals.min()
		vals /= np.sum(vals,axis=0,keepdims=True)
		vals /= np.sqrt(np.sum(np.power(vals,2.0),axis=1,keepdims=True))
		return vals
	
	def init_op_learn(self):
		self.units = tf.Variable(self.init_unit(),dtype=tf.float64)
		self.units_loader = tf.placeholder(tf.float64,shape=[self.ulen,self.unit_width])
		self.load_units = self.units.assign(self.units_loader)
		self.units_position = tf.constant(self.grid_creation(),dtype=tf.int32,shape=[self.ulen,2])
		if self.verbose:
			tf.summary.tensor_summary('Units',self.units)
	
	def get_units(self):
		return self.tf_object.run(self.units)
	
	def get_unit_position(self):
		return self.tf_object.run(self.units_position)
	
	def grid_creation(self):
		pos = np.asarray([[i,j] for i in range(self.dim[0]) for j in range(self.dim[1])])
		return pos
	
	def R(self,x,coeff=None):
		if coeff is None:
			coeff = self.coeff_neighbour
		return tf.exp(
			tf.div(
				-tf.pow(tf.cast(x,tf.float64),2.0),
				tf.reshape(tf.cast(coeff,tf.float64),[1])
			)
		)
	def normalize(self,v):
		tmp = v / tf.sqrt(tf.reduce_sum(tf.pow(v,2.0),1,keep_dims=True))
		return tf.where(tf.is_nan(tmp),tf.zeros_like(tmp),tmp)
	
	def sim2units(self,data,units=None):
		if units is None:
			units = self.units
		dist = self.dist2units(data,units)
#		max_dist = tf.reduce_max(tf.sqrt(tf.reduce_sum(tf.pow((tf.expand_dims(units,0) - tf.expand_dims(units,1)),2.0),axis=2)))
#		gamma = 1.0 / (max_dist/np.sqrt(2.0*self.ulen))
#		res = 1.0 / (tf.pow(dist,2.0)+1.0)
		gamma = 1.0
		res = tf.exp(-gamma*0.5*tf.pow(dist,2.0))
		return res
	
#	def sim2units_neighbour(self,data,units=None):
#		if units is None:
#			units = self.units
#		dist = self.dist2units(data,units)
##		max_dist = tf.reduce_max(tf.sqrt(tf.reduce_sum(tf.pow((tf.expand_dims(units,0) - tf.expand_dims(units,1)),2.0),axis=2)))
##		gamma = 1.0 / (max_dist/np.sqrt(2.0*self.ulen))
##		res = 1.0 / (dist+1.0)
#		gamma = 1.0
#		res = tf.exp(-gamma*tf.pow(dist,2.0))
#		bmus = tf.argmin(dist,1)
#		dist_bmu = self.dist_bmus_op(bmus)
#		neighbour = self.R(dist_bmu,tf.cast(self.learning_rate(self.it),tf.float64)*max(self.dim)/2.0)
#		return res*tf.transpose(neighbour)
	
	def dist2units(self,data,units=None):
		if units is None:
			units = self.units
		return tf.sqrt(tf.reduce_sum(tf.pow(tf.expand_dims(data,1) - tf.expand_dims(units,0),2.0),2))
#		return tf.map_fn(
#			lambda x: tf.sqrt(tf.reduce_sum(tf.pow(units-x,2.0),1)),
#			data,
#			back_prop=False
#		)
	
	def dist_bmus_op(self,bmus):
		pos_bmus = tf.concat(
			[
				tf.expand_dims(tf.floordiv(bmus,self.dim[1]),1),
				tf.expand_dims(tf.mod(bmus,self.dim[1]),1)
			],1
		)
		dist_bmus = tf.sqrt(tf.reduce_sum(
			tf.pow(
				tf.cast(tf.subtract(
					tf.cast(tf.expand_dims(self.units_position,1),tf.int64),
					tf.expand_dims(pos_bmus,0)
				),tf.float64),
				2.0
			),
			2
		))
		return dist_bmus
	
	def bmu_finder(self,data,units):
		return tf.argmin(
			self.dist2units(data,units),
			1
		)
	
	def learn_op(self):
		learn = tf.while_loop(
			self.learning_cond,
			self.learning_process,
			self.learning_var(),
			parallel_iterations=1,
			back_prop=False
		)
		return learn
	
	def learning_cond(self,p):
		# create infinite loop
		# need to be implemented in child classes
		return True
	
	def learning_process(self,p):
		units = p[0]
		bmus = self.bmu_finder(self.data,units)
		update_units = self.update_units(bmus)
		return [[update_units]]
	
	def learning_var(self):
		return[[self.units]]
	
	def prediction_op(self,data):
		return self.bmu_finder(data,self.units)
	
	def repartition_map(self,data,label):
		classes = np.array(np.unique(label))
		nb_class = len(classes)
		pred = np.array(self.get_BMUS(data))
		rep = np.asarray(
			[
				[
					np.sum(np.logical_and(pred == i,label==classes[j]))
					for j in range(nb_class)
				]
				for i in range(self.ulen)
			])
		return ([classes,rep],pred)
	
	def update_units(bmus):
		return self.units
	
	def get_data_dist(self,data):
		tmp = np.asarray(self.tf_object.run(self.data_dist,feed_dict={self.data2pred:data}))
		return tmp
	
	def get_data_sim(self,data):
		tmp = np.asarray(self.tf_object.run(self.data_sim,feed_dict={self.data2pred:data}))
		return tmp
	
	def get_BMUS(self,data):
		return self.tf_object.run(self.bmu,feed_dict={self.data2pred:data})
	
	def predict(self,data):
		pred = self.tf_object.run(self.prediction,feed_dict={self.data2pred:data})
		return pred

############################
# SOM basic implementation #
############################

class SOM(SOM_basic):
	def __init__(self,it_max_op=None,**kwargs):
		self.it_max = it_max_op
		super().__init__(**kwargs)
	
	def init_op_learn(self):
		super().init_op_learn()
		if self.it_max is None:
			self.it_max = tf.placeholder(tf.int32)
		self.it = tf.Variable(1,dtype=tf.int32)
	
	def learning_var(self):
		return [super().learning_var()[0]+[self.it]]
	
	def learning_cond(self,p):
		it = tf.Print(p[1],[p[1]],'It : ') if self.verbose else p[1]
		return tf.reduce_all(it < self.it_max)
	
	def learning_process(self,p):
		units = p[0]
		it = p[1]
		data = tf.random_crop(self.data,[1,self.unit_width])
		bmu = self.bmu_finder(data,units)
		dist_bmu = self.dist_bmus_op(bmu)
		lr = tf.cast(self.learning_rate(it),tf.float64)
		neighbour = self.R(dist_bmu,lr*max(self.dim)/2.0)
		dist_obs = tf.subtract(
			data,
			units
		)
		update_units = self.units.assign_add(lr*neighbour*dist_obs)
		return [[update_units,self.it.assign_add(1)]]
	
	def learning_rate(self,it):
		return tf.cast(self.it_max,tf.float32)/(tf.cast(self.it,tf.float32))
		
	def train(self,limit_it,data):
		feed = {self.it_max:limit_it,self.data:data}
		learn = self.tf_object.run(self.learn,feed_dict=feed)
		return learn
	
	def get_it(self):
		return self.tf_object.run(self.it)