I am trying to use tf.data.TextLineDataset to read from a csv file, shard the dataset over multiple worker nodes and then create an iterator to iterate over them to feed the data in batches. I used the programmer's guide on tf.datasets from TensorFlow (https://www.tensorflow.org/programmers_guide/datasets). The problem when running the code in a tf session is that I get the following error:
*** tensorflow.python.framework.errors_impl.NotFoundError: Date,Open,High,Low,Last,Close,Total Trade Quantity,Turnover,close_pct_change_1d,KAMA7-KAMA30,KAMA15-KAMA30,HT_QUAD,TURNOVER,BOP,MFI,MINUS_DI,ROCP,STOCH_SLOWK,NATR,EMA7-EMA30-1d,DX-1d,PPO-1d,NATR-1d,HT_INPHASOR-2d,day_0,day_1,day_2,day_3; No such file or directory
[[Node: IteratorGetNext_5 = IteratorGetNext[output_shapes=[[], [], [], [], [], ..., [], [], [], [], []], output_types=[DT_INT32, DT_INT32, DT_INT32, DT_INT32, DT_INT32, ..., DT_INT32, DT_INT32, DT_INT32, DT_INT32, DT_INT32], _device="/job:localhost/replica:0/task:0/device:CPU:0"](Iterator_8)]]
Now, "Date", "Open", "High" etc. are the columns in the dataset I want to load. Thus, I know that the error is not related to loading the dataset.
When loading the dataset, I use tf.data.TextLineDataset(file).skip(1) but according to the error, it does not seem to skip the first line of my dataset (which are the column heads).
Does anybody know where this error comes from? And does anybody have a fix to this?
Please see the following code for clarification:
def create_pipeline(bs, nr, ep):
def _X_parse_csv(file):
record_defaults=[[0]]*20
splits = tf.decode_csv(file, record_defaults)
input = splits
return input
def _y_parse_csv(file):
record_defaults=[[0]]*20
splits = tf.decode_csv(file, record_defaults)
label = splits[0]
return label
# Dataset for input data
file = tf.gfile.Glob("./NSEOIL.csv")
num_workers = 1 # for testing; simulate 1 node for sharding below
task_index = 0
ds_file = tf.data.TextLineDataset(file)
ds = ds_file.flat_map(lambda file: (tf.data.TextLineDataset(file).skip(1))) #remove CSV headers
ds = ds.shard(num_workers, task_index).repeat(ep)
X_train = ds.map(_X_parse_csv)
ds = ds_file.flat_map(lambda file: (tf.data.TextLineDataset(file).skip(2))) #remove CSV headers + shift forward 1 day
ds = ds.shard(num_workers, task_index).repeat(ep)
y_train = ds.map(_y_parse_csv)
X_iterator = X_train.make_initializable_iterator()
y_iterator = y_train.make_initializable_iterator()
return X_iterator, y_iterator