I am currently trying to write a Tensorflow data input pipeline using tensorflow queues. My data consists of jpg images, three channels (RGB), and is 128x128 pixels.
My current issue is running my image_batch operation because the operation keeps halting and I'm not sure why.
Below is my code for building my input pipeline.
I have three main functions that I'm using:
read_my_file_formattakes in a filename_queue and attempts to load the file and resize ittensorflow_queuetakes a list of objects and generates a tensorflow FIFO queue. The queue is then added to a queuerunner and added to tf.train.add_queue_runnershuffle_queue_batchis meant to return an operation that fetches a batch of images and labels.
Below is my code.
def read_my_file_format(filename_queue):
reader = tf.WholeFileReader()
filename, image_string = reader.read(filename_queue)
image = tf.image.decode_jpeg(image_string, channels=3)
image = tf.image.resize_images(image, size=[256, 256])
return image
def tensorflow_queue(lst, dtype, capacity=32):
tensor = tf.convert_to_tensor(lst, dtype=dtype)
fq = tf.FIFOQueue(capacity=capacity, dtypes=dtype, shapes=(()))
fq_enqueue_op = fq.enqueue_many([tensor])
tf.train.add_queue_runner(tf.train.QueueRunner(fq, [fq_enqueue_op]*1))
return fq
def shuffle_queue_batch(image, label, batch_size, capacity=32, min_after_dequeue=10, threads=1):
tensor_list = [image, label]
dtypes = [tf.float32, tf.int32]
shapes = [image.get_shape(), label.get_shape()]
rand_shuff_queue = tf.RandomShuffleQueue(
capacity=capacity,
min_after_dequeue=min_after_dequeue,
dtypes=dtypes,
shapes=shapes
)
rand_shuff_enqueue_op = rand_shuff_queue.enqueue(tensor_list)
tf.train.add_queue_runner(tf.train.QueueRunner(rand_shuff_queue, [rand_shuff_enqueue_op] * threads))
image_batch, label_batch = rand_shuff_queue.dequeue_many(batch_size)
return image_batch, label_batch
def input_pipeline(filenames, classes, min_after_dequeue=10):
filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
classes_queue = tensorflow_queue(classes, tf.int32)
image = read_my_file_format(filename_queue)
label = classes_queue.dequeue()
image_batch, label_batch = shuffle_queue_batch(image, label, BATCH_SIZE, min_after_dequeue=min_after_dequeue)
return image_batch, label_batch
with tf.Session() as sess:
sess.run(tf.global_variables_initializer())
# get_image_data returns:
# filenames is a list of strings of the filenames
# classes is a list of ints
# datasize = number of images in dataset
filenames, classes, datasize = get_image_data()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
image_batch, label_batch = input_pipeline(filenames, classes)
print('Starting training')
for ep in range(NUM_EPOCHS):
total_loss = 0
for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
print('fetching batch')
x_batch = sess.run([image_batch])
print('x batch')
y_batch = sess.run([label_batch])
x_batch, y_batch = sess.run([image_batch, label_batch])
Thank you in advance.