We have tensorflow application in which we feed data via queues in batched of 250. After moving to use VarLenFeature (instead of FixedLenFeature) we started to have memory leak during training where the memory was constantly increasing. We are training our models using GPU machines.
This is the decode code:
@staticmethod
def decode(serialized_example):
features = tf.parse_example(
serialized_example,
# Defaults are not specified since both keys are required.
features={
# target_features
RECS: tf.VarLenFeature(tf.float32),
CLICK: tf.FixedLenFeature([], tf.float32)
})
return features
then we convert the sparse to dense using:
tf.identity(tf.sparse_tensor_to_dense(tensor), name=key)
and then we loop over with batched over tensorflow queues
This is the create queue code:
@staticmethod
def create_queue(tensors, capacity, shuffle=False, min_after_dequeue=None, seed=None,
enqueue_many=False, shapes=None, shared_name=None, name=None):
tensor_list = _as_tensor_list(tensors)
with ops.name_scope(name, "shuffle_batch_queue", list(tensor_list)):
tensor_list = _validate(tensor_list)
tensor_list, sparse_info = _store_sparse_tensors(
tensor_list, enqueue_many, tf.constant(True))
map_op = [x.map_op for x in sparse_info]
types = _dtypes([tensor_list])
shapes = _shapes([tensor_list], shapes, enqueue_many)
queue = data_flow_ops.RandomShuffleQueue(
capacity=capacity, min_after_dequeue=min_after_dequeue, seed=seed,
dtypes=types, shapes=shapes, shared_name=shared_name)
return queue, sparse_info, map_op
And the enqueue operation is:
@staticmethod
def enqueue(queue, tensors, num_threads, enqueue_many=False, name=None, map_op = None):
tensor_list = _as_tensor_list(tensors)
with ops.name_scope(name, "shuffle_batch_equeue", list(tensor_list)):
tensor_list = _validate(tensor_list)
tensor_list, sparse_info = _store_sparse_tensors(
tensor_list, enqueue_many, tf.constant(True), map_op)
_enqueue(queue, tensor_list, num_threads, enqueue_many, tf.constant(True))
return queue, sparse_info