2

I am currently trying to write a Tensorflow data input pipeline using tensorflow queues. My data consists of jpg images, three channels (RGB), and is 128x128 pixels.

My current issue is running my image_batch operation because the operation keeps halting and I'm not sure why.

Below is my code for building my input pipeline.

I have three main functions that I'm using:

  1. read_my_file_format takes in a filename_queue and attempts to load the file and resize it
  2. tensorflow_queue takes a list of objects and generates a tensorflow FIFO queue. The queue is then added to a queuerunner and added to tf.train.add_queue_runner

  3. shuffle_queue_batch is meant to return an operation that fetches a batch of images and labels.

Below is my code.

def read_my_file_format(filename_queue):
   reader = tf.WholeFileReader()
   filename, image_string = reader.read(filename_queue)
   image = tf.image.decode_jpeg(image_string, channels=3)
   image = tf.image.resize_images(image, size=[256, 256])
   return image

def tensorflow_queue(lst, dtype, capacity=32):
    tensor = tf.convert_to_tensor(lst, dtype=dtype)
    fq = tf.FIFOQueue(capacity=capacity, dtypes=dtype, shapes=(()))
    fq_enqueue_op = fq.enqueue_many([tensor])
    tf.train.add_queue_runner(tf.train.QueueRunner(fq, [fq_enqueue_op]*1))
    return fq

def shuffle_queue_batch(image, label, batch_size, capacity=32, min_after_dequeue=10, threads=1):
    tensor_list = [image, label]
    dtypes = [tf.float32, tf.int32]
    shapes = [image.get_shape(), label.get_shape()]
    rand_shuff_queue = tf.RandomShuffleQueue(
                                capacity=capacity,
                                min_after_dequeue=min_after_dequeue,
                                dtypes=dtypes,
                                shapes=shapes
                                )
    rand_shuff_enqueue_op = rand_shuff_queue.enqueue(tensor_list)
    tf.train.add_queue_runner(tf.train.QueueRunner(rand_shuff_queue, [rand_shuff_enqueue_op] * threads))

    image_batch, label_batch = rand_shuff_queue.dequeue_many(batch_size)
    return image_batch, label_batch

def input_pipeline(filenames, classes, min_after_dequeue=10):
    filename_queue = tf.train.string_input_producer(filenames, shuffle=False)
    classes_queue = tensorflow_queue(classes, tf.int32)
    image = read_my_file_format(filename_queue)
    label = classes_queue.dequeue()
    image_batch, label_batch = shuffle_queue_batch(image, label, BATCH_SIZE, min_after_dequeue=min_after_dequeue)

    return image_batch, label_batch


with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    # get_image_data returns:
    #    filenames is a list of strings of the filenames
    #    classes is a list of ints
    #    datasize = number of images in dataset
    filenames, classes, datasize = get_image_data()


    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    image_batch, label_batch = input_pipeline(filenames, classes)

    print('Starting training')
    for ep in range(NUM_EPOCHS):
        total_loss = 0
        for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
            print('fetching batch')
            x_batch = sess.run([image_batch])
            print('x batch')
            y_batch = sess.run([label_batch])
            x_batch, y_batch = sess.run([image_batch, label_batch])

Thank you in advance.

2 Answers 2

1

Your code is mostly correct. Just a minor change will make the code work correctly. The reason your code is not working is because you are starting queue runners before you declare the queues. If you look at the return value of start_queue_runners then you will see that the list is empty.

Having said that, Alexandre's advice is good. tf.Data is the way to get high performant input pipeline. Also queuerunners are not compatible with the new TF Eager mechanism.

Relevant code follows:

with tf.Session() as sess:
    sess.run(tf.global_variables_initializer())

    # get_image_data returns:
    #    filenames is a list of strings of the filenames
    #    classes is a list of ints
    #    datasize = number of images in dataset
    filenames, classes, datasize = get_image_data()

    image_batch, label_batch = input_pipeline(filenames, classes)

    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)

    #image_batch, label_batch = input_pipeline(filenames, classes)

    print('Starting training')
    for ep in range(NUM_EPOCHS):
        total_loss = 0
        for _ in range(datasize // BATCH_SIZE * BATCH_SIZE):
            print('fetching batch')
            x_batch = sess.run([image_batch])
            print('x batch')
            y_batch = sess.run([label_batch])
            x_batch, y_batch = sess.run([image_batch, label_batch])
Sign up to request clarification or add additional context in comments.

2 Comments

Thank you so much. That fixed my issue.
Please accept the answer if it fixed your issue. Thanks.
0

I strongly recommend you switch your input pipeline to tf.data from the tf.train queues. Queue input pipelines are inefficient and hard to maintain.

1 Comment

I implemented both pipelines, one using queues and tf.data. I'd say that using tf.data was much more intuitive.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.