热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

tf多线程读取数据

多线程读取数据的机制tf中多线程读取数据跟常规的python多线程思路一致,是基于Queue的多线程编程。主线程读取数据,然后计算,在读数

多线程读取数据的机制

tf中多线程读取数据跟常规的python多线程思路一致,是基于Queue的多线程编程。

主线程读取数据,然后计算,在读数据这部分有两个线程,一个线程读取文件名,生成文件名队列,另一个线程从文件名队列中获取文件名,并读取相应文件,生成数据队列。

图示如下

tensorflow 在队列中加入“结束”标记符,当读取线程检测到该标记符时,会抛出异常OutOfRange,后续代码会捕捉该异常,从而结束线程。

 

读取文件的相应函数

tf.train.string_input_producer(filelist, shuffle, num_epochs)

用于生成文件名队列,3个参数,filelist代表文件名list,num_epochs表示epochs数,shuffle是指在一个epoch内,文件的顺序是否被打乱。

reader对象

reader对象由文件类型对应的读取方法生成,该对象会自动读取文件,并创建数据队列,输出key/文件名,value/文件内容

tf.train.start_queue_runners

注意,在调用tf.train.string_input_producer后,文件名并没有被真正加入文件名队列,而只是创建了一个空队列,此时如果直接计算,系统会陷入阻塞状态。

此时需要启动队列,就是调用tf.train.start_queue_runners。

 

读取图片示例

# encoding:utf-8
__author__ = 'HP'
import tensorflow as tf# 新建一个Session
with tf.Session() as sess:# 文件名listfilename = ['2.png', '3.png']# string_input_producer会产生一个文件名队列filename_queue = tf.train.string_input_producer(filename, shuffle=False, num_epochs=2)# reader从文件名队列中读数据。对应的方法是reader.readreader = tf.WholeFileReader()key, value = reader.read(filename_queue)# tf.train.string_input_producer定义了一个epoch变量,要对它进行初始化
tf.local_variables_initializer().run()# 使用start_queue_runners之后,才会开始填充队列threads = tf.train.start_queue_runners(sess=sess)i = 0while True:i += 1# 获取图片数据并保存image_data = sess.run(value)with open('test_%d.jpg' % i, 'wb') as f:f.write(image_data)

输出四张图片

两张图片,两个epoch,生成了4张图片。(图片每次读取全部数据)

 

队列

上面提到各种队列,tf与python一样,有多种队列

tf.FIFOQueue  先入先出

tf.RandomShuffleQueue  随机出队

tf.PaddingFIFOQueue   以固定长度批量出列的队列

tf.PriorityQueue   带优先级出列的队列

使用逻辑都类似

 

tf.FIFOQueue(capacity, dtypes, shapes=None, names=None ...)

 

简单例子 (需要理解python的队列使用)

import tensorflow as tftf.InteractiveSession()q = tf.FIFOQueue(2, "float") # 最多2个元素
init = q.enqueue_many(([0,0],)) # 初始化队列

x
= q.dequeue() # get
y = x+1
q_inc
= q.enqueue([y]) # put

init.run()
## 初始化队列[0, 0]
#
print(x.eval()) # 0.0
#
print(x.eval()) # 0.0

q_inc.run()
## get 0 +1 put 1, 队列变成 [0, 1]
q_inc.run() ## get 0 +1 put 1,队列变成 [1, 1]
q_inc.run() ## get 1 +1 put 2,队列变成 [1, 2]
print(x.eval()) ## get 1
print(x.eval()) # get 2.0
x.eval() # 阻塞

enqueue_many 生成队列,enqueue put元素,dequeue get元素

 

 tf多线程

tf提供了两个类来实现多线程协同的功能。分别是 tf.Coordinator() and tf.QueueRunner()。

 

tf.Coordinator()

tf.Coordinator()主要用于协同多个线程一起停止,包括  should_stop、  request_stop、 join 三个接口。

实现机制:

在启动线程前,先创建Coordinator类的实例对象coord,并将coord传给每一个线程,每个线程要不断检测这个实例的 should_stop 方法,如果返回True,就停止,

并且可以启动这个实例的 request_stop 方法(任意线程都可随时启动这个方法),这个方法会通知其他线程,一起结束,而且一旦这个方法被调用,should_stop方法就会返回True,其他线程都会结束。

 

示例代码

__author__ = 'HP'
# coding utf-8
import tensorflow as tf
import numpy as np
import threading
import time# 线程中运行的程序,这个程序每隔1秒判断是否需要停止并打印自己的ID。
def MyLoop(coord, worker_id):# 使用tf.Coordinator类提供的协同工具判断当前线程是否需要停止并打印自己的IDwhile not coord.should_stop():# 人为制造一个停止的条件if np.random.rand() <0.1:print(&#39;Stoping from id: %d\n&#39; % worker_id)# 调用coord.request_stop()函数来通知其他线程停止
coord.request_stop()else:# 打印当前线程的IDprint(&#39;Working on id: %d\n&#39; % worker_id)# 暂停1秒time.sleep(1)# 声明一个tf.train.Coordinator类来协同多个线程
coord &#61; tf.train.Coordinator()
# 声明创建5个线程
threads &#61; [threading.Thread(target&#61;MyLoop, args&#61;(coord, i, )) for i in range(5)]
# 启动所有的线程
for t in threads: t.start()
# 等待所有线程退出
coord.join(threads)

输出

可以看到&#xff0c;一旦一个线程结束了&#xff0c;其他线程也跟着结束。

 

tf.QueueRunner()

tf.QueueRunner()用来同时启动多个线程操作同一个队列。并借助 tf.Coordinator()对线程进行管理。

 

示例代码

import tensorflow as tf# 声明一个先进先出的队列&#xff0c;队列中最多n个元素&#xff0c;类型
queue &#61; tf .FIFOQueue(10, &#39;float&#39;)
# 定义队列的入队操作
enqueue_op &#61; queue.enqueue([tf.random_normal([1])])# 使用 tf.train.QueueRunner来创建多个线程运行队列的入队操作
#
tf.train.QueueRunner给出了被操作的队列&#xff0c;[enqueue_op] * 5
#
表示了需要启动5个线程&#xff0c;每个线程中运行的是enqueue_op操作
qr &#61; tf.train.QueueRunner(queue, [enqueue_op] * 5)
# 将定义过的QueueRunner加入TensorFlow计算图上指定的集合
#
tf.train.add_queue_runner函数没有指定集合&#xff0c;
#
则加入默认集合tf.GraphKeys.QUEUE_RUNNERS。
#
下面的函数就是将刚刚定义的qr加入默认的tf.GraphKeys.QUEUE_RUNNERS结合
tf.train.add_queue_runner(qr)
# 定义出队操作
out_tensor &#61; queue.dequeue()with tf.Session() as sess:# 使用tf.train.Coordinator来协同启动的线程coord &#61; tf.train.Coordinator()# 使用tf.train.QueueRunner时&#xff0c;需要明确调用tf.train.start_queue_runners# 来启动所有线程。否则因为没有线程运行入队操作&#xff0c;当调用出队操作时&#xff0c;程序一直等待# 入队操作被运行。tf.train.start_queue_runners函数会默认启动# tf.GraphKeys.QUEUE_RUNNERS中所有QueueRunner.因为这个函数只支持启动指定集合中的QueueRunner,# 所以一般来说tf.train.add_queue_runner函数和tf.train.start_queue_runners函数会指定同一个结合threads &#61; tf.train.start_queue_runners(sess&#61;sess, coord&#61;coord)# 获取队列中的取值for _ in range(11): print(sess.run(out_tensor)[0])# 使用tf.train.Coordinator来停止所有线程
coord.request_stop()coord.join(threads)

同时启动多个线程&#xff0c;每个线程每次运行都将一个随机数写入队列&#xff0c;所以每次都能取到一个随机数。

 

用法小结

1. 创建多线程&#xff0c;方法是 tf.QueueRunner(aim_queue&#xff0c; [operation] * thread_num)&#xff0c;目标队列&#xff0c;创建n个线程&#xff0c;执行某操作

2. 然后将多线程加入 tensorflow 节点&#xff0c;

3. 然后显示的调用 start_queue_runners 启动所有线程

 

tf 多线程读取文件

csv

import tensorflow as tf# 文件名队列
filename_queue &#61; tf.train.string_input_producer(["xx1.csv", "xx2.csv"], shuffle&#61;False)# reader对象 &#xff08;文件阅读器&#xff09;
reader &#61; tf.TextLineReader()
key, value
&#61; reader.read(filename_queue)# 为数据设定默认格式&#xff0c;如果出现空值&#xff0c;就替换为这种格式的默认值。
#
注意格式必须一样&#xff0c;因为输出的格式是统一的&#xff0c;如下
#
[array([ 4. , 0. , 34.322323, 1. ], dtype&#61;float32)]
record_defaults &#61; [[1.], [1.], [1.], [1.]]
col1, col2, col3, col4
&#61; tf.decode_csv(value, record_defaults&#61;record_defaults) # decode_csv
features &#61; tf.concat([[col1], [col2], [col3], [col4]], 0)with tf.Session() as sess:# Start populating the filename queue.coord &#61; tf.train.Coordinator()threads &#61; tf.train.start_queue_runners(coord&#61;coord) # 启动队列&#xff0c;相当于是启动了多个线程&#xff0c;并将coord传入每个线程for i in range(12):example &#61; sess.run([features])print(example)coord.request_stop() #
coord.join(threads) # 等待结束

文件阅读器每次从文件内读取一行&#xff0c;如果有空值&#xff0c;就根据默认格式自动填补&#xff0c;decode_csv 将读取内容解析成张量。

将上述代码与QueueRunner代码对比&#xff0c;不难发现&#xff0c;其实 string_input_producer 生成的就是一个 QueueRunner

 

 

 参考资料&#xff1a;

http://www.cnblogs.com/demian/p/8005407.html

https://www.cnblogs.com/yinghuali/p/7506073.html#top

https://blog.csdn.net/qq_37423198/article/details/80524600

http://wiki.jikexueyuan.com/project/tensorflow-zh/how_tos/reading_data.html

https://blog.csdn.net/s_sunnyy/article/details/72924317

转:https://www.cnblogs.com/yanshw/p/10541587.html



推荐阅读
  • 普通树(每个节点可以有任意数量的子节点)级序遍历 ... [详细]
  • Python多线程编程技巧与实战应用详解 ... [详细]
  • 使用Tkinter构建51Ape无损音乐爬虫UI
    本文介绍了如何使用Python的内置模块Tkinter来构建一个简单的用户界面,用于爬取51Ape网站上的无损音乐百度云链接。虽然Tkinter入门相对简单,但在实际开发过程中由于文档不足可能会带来一些不便。 ... [详细]
  • 本文详细介绍了如何使用Python的多进程技术来高效地分块读取超大文件,并将其输出为多个文件。通过这种方式,可以显著提高读取速度和处理效率。 ... [详细]
  • 探讨Redis的最佳应用场景
    本文将深入探讨Redis在不同场景下的最佳应用,包括其优势和适用范围。 ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 在Windows系统中安装TensorFlow GPU版的详细指南与常见问题解决
    在Windows系统中安装TensorFlow GPU版是许多深度学习初学者面临的挑战。本文详细介绍了安装过程中的每一个步骤,并针对常见的问题提供了有效的解决方案。通过本文的指导,读者可以顺利地完成安装并避免常见的陷阱。 ... [详细]
  • iOS snow animation
    CTSnowAnimationView.hCTMyCtripCreatedbyalexon1614.Copyright©2016年ctrip.Allrightsreserved.# ... [详细]
  • 本文详细探讨了使用Python3编写爬虫时如何应对网站的反爬虫机制,通过实例讲解了如何模拟浏览器访问,帮助读者更好地理解和应用相关技术。 ... [详细]
  • 本文将深入探讨 iOS 中的 Grand Central Dispatch (GCD),并介绍如何利用 GCD 进行高效多线程编程。如果你对线程的基本概念还不熟悉,建议先阅读相关基础资料。 ... [详细]
  • 本文介绍了如何使用 Google Colab 的免费 GPU 资源进行深度学习应用开发。Google Colab 是一个无需配置即可使用的云端 Jupyter 笔记本环境,支持多种深度学习框架,并且提供免费的 GPU 计算资源。 ... [详细]
  • Python多线程详解与示例
    本文介绍了Python中的多线程编程,包括僵尸进程和孤儿进程的概念,并提供了具体的代码示例。同时,详细解释了0号进程和1号进程在系统中的作用。 ... [详细]
  • 本文详细介绍了Linux系统中用于管理IPC(Inter-Process Communication)资源的两个重要命令:ipcs和ipcrm。通过这些命令,用户可以查看和删除系统中的消息队列、共享内存和信号量。 ... [详细]
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • Python 程序转换为 EXE 文件:详细解析 .py 脚本打包成独立可执行文件的方法与技巧
    在开发了几个简单的爬虫 Python 程序后,我决定将其封装成独立的可执行文件以便于分发和使用。为了实现这一目标,首先需要解决的是如何将 Python 脚本转换为 EXE 文件。在这个过程中,我选择了 Qt 作为 GUI 框架,因为之前对此并不熟悉,希望通过这个项目进一步学习和掌握 Qt 的基本用法。本文将详细介绍从 .py 脚本到 EXE 文件的整个过程,包括所需工具、具体步骤以及常见问题的解决方案。 ... [详细]
author-avatar
Fier田野莎莎
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有