热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

tf多线程读取数据

多线程读取数据的机制tf中多线程读取数据跟常规的python多线程思路一致,是基于Queue的多线程编程。主线程读取数据,然后计算,在读数

多线程读取数据的机制

tf中多线程读取数据跟常规的python多线程思路一致,是基于Queue的多线程编程。

主线程读取数据,然后计算,在读数据这部分有两个线程,一个线程读取文件名,生成文件名队列,另一个线程从文件名队列中获取文件名,并读取相应文件,生成数据队列。

图示如下

tensorflow 在队列中加入“结束”标记符,当读取线程检测到该标记符时,会抛出异常OutOfRange,后续代码会捕捉该异常,从而结束线程。

 

读取文件的相应函数

tf.train.string_input_producer(filelist, shuffle, num_epochs)

用于生成文件名队列,3个参数,filelist代表文件名list,num_epochs表示epochs数,shuffle是指在一个epoch内,文件的顺序是否被打乱。

reader对象

reader对象由文件类型对应的读取方法生成,该对象会自动读取文件,并创建数据队列,输出key/文件名,value/文件内容

tf.train.start_queue_runners

注意,在调用tf.train.string_input_producer后,文件名并没有被真正加入文件名队列,而只是创建了一个空队列,此时如果直接计算,系统会陷入阻塞状态。

此时需要启动队列,就是调用tf.train.start_queue_runners。

 

读取图片示例

# encoding:utf-8
__author__ = 'HP'
import tensorflow as tf# 新建一个Session
with tf.Session() as sess:# 文件名listfilename = ['2.png', '3.png']# string_input_producer会产生一个文件名队列filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=2)# reader从文件名队列中读数据。对应的方法是reader.readreader = tf.WholeFileReader()key, value = reader.read(filename_queue)# tf.train.string_input_producer定义了一个epoch变量,要对它进行初始化
tf.local_variables_initializer().run()# 使用start_queue_runners之后,才会开始填充队列threads = tf.train.start_queue_runners(sess=sess)i = 0while True:i += 1# 获取图片数据并保存image_data = sess.run(value)with open('test_%d.jpg' % i, 'wb') as f:f.write(image_data)

输出四张图片

两张图片,两个epoch,生成了4张图片。(图片每次读取全部数据)

 

队列

上面提到各种队列,tf与python一样,有多种队列

tf.FIFOQueue  先入先出

tf.RandomShuffleQueue  随机出队

tf.PaddingFIFOQueue   以固定长度批量出列的队列

tf.PriorityQueue   带优先级出列的队列

使用逻辑都类似

 

tf.FIFOQueue(capacity, dtypes, shapes=None, names=None ...)

 

简单例子 (需要理解python的队列使用)

import tensorflow as tftf.InteractiveSession()q = tf.FIFOQueue(2, "float") # 最多2个元素
init = q.enqueue_many(([0,0],)) # 初始化队列

x
= q.dequeue() # get
y = x+1
q_inc
= q.enqueue([y]) # put

init.run()
## 初始化队列[0, 0]
#
print(x.eval()) # 0.0
#
print(x.eval()) # 0.0

q_inc.run()
## get 0 +1 put 1, 队列变成 [0, 1]
q_inc.run() ## get 0 +1 put 1,队列变成 [1, 1]
q_inc.run() ## get 1 +1 put 2,队列变成 [1, 2]
print(x.eval()) ## get 1
print(x.eval()) # get 2.0
x.eval() # 阻塞

enqueue_many 生成队列,enqueue put元素,dequeue get元素

 

 tf多线程

tf提供了两个类来实现多线程协同的功能。分别是 tf.Coordinator() and tf.QueueRunner()。

 

tf.Coordinator()

tf.Coordinator()主要用于协同多个线程一起停止,包括  should_stop、  request_stop、 join 三个接口。

实现机制:

在启动线程前,先创建Coordinator类的实例对象coord,并将coord传给每一个线程,每个线程要不断检测这个实例的 should_stop 方法,如果返回True,就停止,

并且可以启动这个实例的 request_stop 方法(任意线程都可随时启动这个方法),这个方法会通知其他线程,一起结束,而且一旦这个方法被调用,should_stop方法就会返回True,其他线程都会结束。

 

示例代码

__author__ = 'HP'
# coding utf-8
import tensorflow as tf
import numpy as np
import threading
import time# 线程中运行的程序,这个程序每隔1秒判断是否需要停止并打印自己的ID。
def MyLoop(coord, worker_id):# 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止并打印自己的IDwhile not coord.should_stop():# 人为制造一个停止的条件if np.random.rand() <0.1:print(&#39;Stoping from id: %d\n&#39; % worker_id)# 调用coord.request_stop()函数来通知其他线程停止
coord.request_stop()else:# 打印当前线程的IDprint(&#39;Working on id: %d\n&#39; % worker_id)# 暂停1秒time.sleep(1)# 声明一个tf.train.Coordinator类来协同多个线程
coord &#61; tf.train.Coordinator()
# 声明创建5个线程
threads &#61; [threading.Thread(target&#61;MyLoop, args&#61;(coord, i, )) for i in range(5)]
# 启动所有的线程
for t in threads: t.start()
# 等待所有线程退出
coord.join(threads)

输出

可以看到&#xff0c;一旦一个线程结束了&#xff0c;其他线程也跟着结束。

 

tf.QueueRunner()

tf.QueueRunner()用来同时启动多个线程操作同一个队列。并借助 tf.Coordinator()对线程进行管理。

 

示例代码

import tensorflow as tf# 声明一个先进先出的队列&#xff0c;队列中最多n个元素&#xff0c;类型
queue &#61; tf .FIFOQueue(10, &#39;float&#39;)
# 定义队列的入队操作
enqueue_op &#61; queue.enqueue([tf.random_normal([1])])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作
#
tf.train.QueueRunner给出了被操作的队列&#xff0c;[enqueue_op] * 5
#
表示了需要启动5个线程&#xff0c;每个线程中运行的是enqueue_op操作
qr &#61; tf.train.QueueRunner(queue, [enqueue_op] * 5)
# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合
#
tf.train.add_queue_runner函数没有指定集合&#xff0c;
#
则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。
#
下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合
tf.train.add_queue_runner(qr)
# 定义出队操作
out_tensor &#61; queue.dequeue()with tf.Session() as sess:# 使用tf.train.Coordinator来协同启动的线程coord &#61; tf.train.Coordinator()# 使用tf.train.QueueRunner时&#xff0c;需要明确调用tf.train.start_queue_runners# 来启动所有线程。否则因为没有线程运行入队操作&#xff0c;当调用出队操作时&#xff0c;程序一直等待# 入队操作被运行。tf.train.start_queue_runners函数会默认启动# tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner,# 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合threads &#61; tf.train.start_queue_runners(sess&#61;sess, coord&#61;coord)# 获取队列中的取值for _ in range(11): print(sess.run(out_tensor)[0])# 使用tf.train.Coordinator来停止所有线程
coord.request_stop()coord.join(threads)

同时启动多个线程&#xff0c;每个线程每次运行都将一个随机数写入队列&#xff0c;所以每次都能取到一个随机数。

 

用法小结

1. 创建多线程&#xff0c;方法是 tf.QueueRunner(aim_queue&#xff0c; [operation] * thread_num)&#xff0c;目标队列&#xff0c;创建n个线程&#xff0c;执行某操作

2. 然后将多线程加入 tensorflow 节点&#xff0c;

3. 然后显示的调用 start_queue_runners 启动所有线程

 

tf 多线程读取文件

csv

import tensorflow as tf# 文件名队列
filename_queue &#61; tf.train.string_input_producer(["xx1.csv", "xx2.csv"], shuffle&#61;False)# reader对象 &#xff08;文件阅读器&#xff09;
reader &#61; tf.TextLineReader()
key, value
&#61; reader.read(filename_queue)# 为数据设定默认格式&#xff0c;如果出现空值&#xff0c;就替换为这种格式的默认值。
#
注意格式必须一样&#xff0c;因为输出的格式是统一的&#xff0c;如下
#
[array([ 4. , 0. , 34.322323, 1. ], dtype&#61;float32)]
record_defaults &#61; [[1.], [1.], [1.], [1.]]
col1, col2, col3, col4
&#61; tf.decode_csv(value, record_defaults&#61;record_defaults) # decode_csv
features &#61; tf.concat([[col1], [col2], [col3], [col4]], 0)with tf.Session() as sess:# Start populating the filename queue.coord &#61; tf.train.Coordinator()threads &#61; tf.train.start_queue_runners(coord&#61;coord) # 启动队列&#xff0c;相当于是启动了多个线程&#xff0c;并将coord传入每个线程for i in range(12):example &#61; sess.run([features])print(example)coord.request_stop() #
coord.join(threads) # 等待结束

文件阅读器每次从文件内读取一行&#xff0c;如果有空值&#xff0c;就根据默认格式自动填补&#xff0c;decode_csv 将读取内容解析成张量。

将上述代码与QueueRunner代码对比&#xff0c;不难发现&#xff0c;其实 string_input_producer 生成的就是一个 QueueRunner

 

 

 参考资料&#xff1a;

http://www.cnblogs.com/demian/p/8005407.html

https://www.cnblogs.com/yinghuali/p/7506073.html#top

https://blog.csdn.net/qq_37423198/article/details/80524600

http://wiki.jikexueyuan.com/project/tensorflow-zh/how_tos/reading_data.html

https://blog.csdn.net/s_sunnyy/article/details/72924317

转:https://www.cnblogs.com/yanshw/p/10541587.html



推荐阅读
  • 从 .NET 转 Java 的自学之路:IO 流基础篇
    本文详细介绍了 Java 中的 IO 流,包括字节流和字符流的基本概念及其操作方式。探讨了如何处理不同类型的文件数据,并结合编码机制确保字符数据的正确读写。同时,文中还涵盖了装饰设计模式的应用,以及多种常见的 IO 操作实例。 ... [详细]
  • 优化ListView性能
    本文深入探讨了如何通过多种技术手段优化ListView的性能,包括视图复用、ViewHolder模式、分批加载数据、图片优化及内存管理等。这些方法能够显著提升应用的响应速度和用户体验。 ... [详细]
  • 本文介绍了一种根据目标检测结果,从原始XML文件中提取并分析特定类别的方法。通过解析XML文件,筛选出特定类别的图像和标注信息,并保存到新的文件夹中,以便进一步分析和处理。 ... [详细]
  • 本文将介绍如何编写一些有趣的VBScript脚本,这些脚本可以在朋友之间进行无害的恶作剧。通过简单的代码示例,帮助您了解VBScript的基本语法和功能。 ... [详细]
  • 本文探讨了Hive中内部表和外部表的区别及其在HDFS上的路径映射,详细解释了两者的创建、加载及删除操作,并提供了查看表详细信息的方法。通过对比这两种表类型,帮助读者理解如何更好地管理和保护数据。 ... [详细]
  • 深入理解Tornado模板系统
    本文详细介绍了Tornado框架中模板系统的使用方法。Tornado自带的轻量级、高效且灵活的模板语言位于tornado.template模块,支持嵌入Python代码片段,帮助开发者快速构建动态网页。 ... [详细]
  • 1.如何在运行状态查看源代码?查看函数的源代码,我们通常会使用IDE来完成。比如在PyCharm中,你可以Ctrl+鼠标点击进入函数的源代码。那如果没有IDE呢?当我们想使用一个函 ... [详细]
  • 本文详细介绍了Java编程语言中的核心概念和常见面试问题,包括集合类、数据结构、线程处理、Java虚拟机(JVM)、HTTP协议以及Git操作等方面的内容。通过深入分析每个主题,帮助读者更好地理解Java的关键特性和最佳实践。 ... [详细]
  • 解读MySQL查询执行计划的详细指南
    本文旨在帮助开发者和数据库管理员深入了解如何解读MySQL查询执行计划。通过详细的解析,您将掌握优化查询性能的关键技巧,了解各种访问类型和额外信息的含义。 ... [详细]
  • 本文探讨了 Objective-C 中的一些重要语法特性,包括 goto 语句、块(block)的使用、访问修饰符以及属性管理等。通过实例代码和详细解释,帮助开发者更好地理解和应用这些特性。 ... [详细]
  • 尽管使用TensorFlow和PyTorch等成熟框架可以显著降低实现递归神经网络(RNN)的门槛,但对于初学者来说,理解其底层原理至关重要。本文将引导您使用NumPy从头构建一个用于自然语言处理(NLP)的RNN模型。 ... [详细]
  • MySQL索引详解与优化
    本文深入探讨了MySQL中的索引机制,包括索引的基本概念、优势与劣势、分类及其实现原理,并详细介绍了索引的使用场景和优化技巧。通过具体示例,帮助读者更好地理解和应用索引以提升数据库性能。 ... [详细]
  • 毕业设计:基于机器学习与深度学习的垃圾邮件(短信)分类算法实现
    本文详细介绍了如何使用机器学习和深度学习技术对垃圾邮件和短信进行分类。内容涵盖从数据集介绍、预处理、特征提取到模型训练与评估的完整流程,并提供了具体的代码示例和实验结果。 ... [详细]
  • 深入解析Java虚拟机(JVM)架构与原理
    本文旨在为读者提供对Java虚拟机(JVM)的全面理解,涵盖其主要组成部分、工作原理及其在不同平台上的实现。通过详细探讨JVM的结构和内部机制,帮助开发者更好地掌握Java编程的核心技术。 ... [详细]
  • 在高并发需求的C++项目中,我们最初选择了JsonCpp进行JSON解析和序列化。然而,在处理大数据量时,JsonCpp频繁抛出异常,尤其是在多线程环境下问题更为突出。通过分析发现,旧版本的JsonCpp存在多线程安全性和性能瓶颈。经过评估,我们最终选择了RapidJSON作为替代方案,并实现了显著的性能提升。 ... [详细]
author-avatar
Fier田野莎莎
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有