如何在tensorflow中使用自定义python函数预取数据

我正在尝试预取训练数据以隐藏I/O延迟。我想编写自定义Python代码,从磁盘加载数据并预处理数据(例如,通过添加上下文窗口)。换句话说,一个线程进行数据预处理,另一个线程进行训练。这在TensorFlow中可能吗

更新:我有一个基于@mrry示例的工作示例

将numpy导入为np
导入tensorflow作为tf
导入线程
批量大小=5
培训费用=4100
feature_input=tf.placeholder(tf.float32,shape=[128])
label\u input=tf.placeholder(tf.float32,shape=[128])
q=tf.FIFOQueue(200[tf.float32,tf.float32],shapes=[[128],[128])
enqueue_op=q.enqueue([标签输入,特征输入])
标签批次,特征批次=q.dequeue多个(批次大小)
c=tf.重塑(特征批次,[批次大小,128])+tf.重塑(标签批次,[批次大小,128]))
sess=tf.Session()
def加载和排队(sess、排队操作、协调):
打开('dummy_data/features.bin')作为特征文件,打开('dummy_data/labels.bin')作为标签文件:
当不协调时,是否应停止()
feature\u array=np.fromfile(feature\u文件,np.float32,128)
如果特征_array.shape[0]==0:
打印('到达文件末尾,使用seek(0,0)重置)
feature_file.seek(0,0)
label_file.seek(0,0)
持续
label\u value=np.fromfile(label\u文件,np.float32,128)
run(排队运行,提要dict={feature\u输入:feature\u数组,
标签\输入:标签\值})
coord=tf.train.Coordinator()
t=threading.Thread(target=load_和_enqueue,args=(sess,enqueue_op,coord))
t、 开始()
对于范围内的i(培训):
总和=sess.run(c)
打印('train_iter='+str(i))
打印(总和)
协调请求停止()
坐标连接([t])

这是一个常见的用例,大多数实现都使用TensorFlow的队列将预处理代码与训练代码解耦。有一个关于如何使用队列的教程,但主要步骤如下:

  1. 定义一个队列,q,它将缓冲预处理的数据。TensorFlow支持简单的tf.FIFOQueue,它按元素排队的顺序生成元素,支持更高级的tf.RandomShuffleQueue,它按随机顺序生成元素。队列元素是一个或多个张量的元组(可以有不同的类型和形状)。所有队列都支持单个元素(enqueuedequeue)和批(enqueue\u manydequeue\u many)操作,但要使用批处理操作,必须在构造队列时指定队列元素中每个张量的形状

  2. 构建一个子图,将预处理的元素排入队列。一种方法是为对应于单个输入示例的张量定义一些tf.placeholder()ops,然后将它们传递给q.enqueue()。(如果预处理一次生成一个批,则应改用q.enqueue_many())也可以在此子图中包含TensorFlow ops

  3. 构建一个子图来执行训练。这将看起来像一个常规的TensorFlow图,但将通过调用q.dequeue\u many(BATCH\u SIZE)获得其输入

  4. 开始您的会话

  5. 创建一个或多个执行预处理逻辑的线程,然后执行排队操作,输入预处理的数据。您可能会发现tf.train.Coordinatortf.train.QueueRunner实用程序类对此很有用

  6. 正常运行训练图(优化器等)

编辑:这里有一个简单的加载并排队()函数和代码片段,让您开始:

#特征是长度为100的浮点向量
feature_input=tf.placeholder(tf.float32,shape=[100])
#标签是标量整数。
label\u input=tf.placeholder(tf.int32,shape=[])
#或者,可以这样做:
#feature\u batch\u input=tf.placeholder(tf.float32,shape=[None,100])
#label\u batch\u input=tf.placeholder(tf.int32,shape=[None])
q=tf.FIFOQueue(100[tf.float32,tf.int32],shapes=[100],]
enqueue_op=q.enqueue([特征输入,标签输入])
#对于批量输入,请执行以下操作:
#enqueue\u op=q.enqueue\u many([特征\u批次\u输入,标签\u批次\u输入])
特征批次,标签批次=q.dequeue多(批次大小)
#以标签批次、特征批次作为输入构建模型的其余部分。
# [...]
列车运行时间=。。。
sess=tf.Session()
def load_和_enqueue():
打开(…)作为要素文件,打开(…)作为标签文件:
尽管如此:
feature\u array=numpy.fromfile(feature\u文件,numpy.float32100)
如果不是功能单元阵列:
回来
label_value=numpy.fromfile(feature_文件,numpy.int32,1)[0]
run(排队运行,提要dict={feature\u输入:feature\u数组,
标签\输入:标签\值})
#启动一个线程以异步方式将数据排队,并隐藏I/O延迟。
t=线程。线程(目标=加载和排队)
t、 开始()
对于范围内(训练时期):
列车运行(列车运行)

发表评论