热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Python实现大文件排序的方法

这篇文章主要介绍了Python大文件排序的方法,涉及Python针对文件、缓存及日期等操作的相关技巧,具有一定参考借鉴价值,需要的朋友可以参考下
本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:

import gzip
import os
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
from datetime import datetime
def sort_worker(input,output):
 while True:
  lines = input.get().splitlines()
  element_set = {}
  for line in lines:
    if line.strip() == 'STOP':
      return
    try:
      element = line.split(' ')[0]
      if not element_set.get(element): element_set[element] = ''
    except:
      pass
  sorted_element = sorted(element_set)
  #print sorted_element
  output.put('\n'.join(sorted_element))
def write_worker(input, pre):
  os.system('mkdir %s'%pre)
  i = 0
  while True:
    cOntent= input.get()
    if content.strip() == 'STOP':
      return
    write_sorted_bulk(content, '%s/%s'%(pre, i))
    i += 1
def write_sorted_bulk(content, filename):
  f = file(filename, 'w')
  f.write(content)
  f.close()
def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4):
  t = datetime.now()
  pre, ext = os.path.splitext(filename)
  if ext == '.gz':
    file_file = gzip.open(filename, 'rb')
  else:
    file_file = open(filename)
  bulk_queue = Queue(10)
  sorted_queue = Queue(10)
  NUM_SORT = num_sort
  sort_worker_pool = []
  for i in range(NUM_SORT):
    sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) )
    sort_worker_pool[i].start()
  NUM_WRITE = 1
  write_worker_pool = []
  for i in range(NUM_WRITE):
    write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) )
    write_worker_pool[i].start()
  buf = file_file.read(buf_size)
  sorted_count = 0
  while len(buf):
    end_line = buf.rfind('\n')
    #print buf[:end_line+1]
    bulk_queue.put(buf[:end_line+1])
    sorted_count += 1
    if end_line != -1:
      buf = buf[end_line+1:] + file_file.read(buf_size)
    else:
      buf = file_file.read(buf_size)
  for i in range(NUM_SORT):
    bulk_queue.put('STOP')
  for i in range(NUM_SORT):
    sort_worker_pool[i].join()
   
  for i in range(NUM_WRITE):
    sorted_queue.put('STOP')
  for i in range(NUM_WRITE):
    write_worker_pool[i].join()
  print 'elasped ', datetime.now() - t
  return sorted_count
from heapq import heappush, heappop
from datetime import datetime
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
import os
class file_heap:
  def __init__(self, dir, idx = 0, count = 1):
    files = os.listdir(dir)
    self.heap = []
    self.files = {}
    self.bulks = {}
    self.pre_element = None
    for i in range(len(files)):
      file = files[i]
      if hash(file) % count != idx: continue
      input = open(os.path.join(dir, file))
      self.files[i] = input
      self.bulks[i] = ''
      heappush(self.heap, (self.get_next_element_buffered(i), i))
  def get_next_element_buffered(self, i):
    if len(self.bulks[i]) <256:
      if self.files[i] is not None:
        buf = self.files[i].read(65536)
        if buf:
          self.bulks[i] += buf
        else:
          self.files[i].close()
          self.files[i] = None
    end_line = self.bulks[i].find('\n')
    if end_line == -1:
      end_line = len(self.bulks[i])
    element = self.bulks[i][:end_line]
    self.bulks[i] = self.bulks[i][end_line+1:]
    return element
  def poppush_uniq(self):
    while True:
      element = self.poppush()
      if element is None:
        return None
      if element != self.pre_element:
        self.pre_element = element
        return element
  def poppush(self):
    try:
      element, index = heappop(self.heap)
    except IndexError:
      return None
    new_element = self.get_next_element_buffered(index)
    if new_element:
      heappush(self.heap, (new_element, index))
    return element
def heappoppush(dir, queue, idx = 0, count = 1):
  heap = file_heap(dir, idx, count)
  while True:
    d = heap.poppush_uniq()
    queue.put(d)
    if d is None: return
def heappoppush2(dir, queue, count = 1):
  heap = []
  procs = []
  queues = []
  pre_element = None
  for i in range(count):
    q = Queue(1024)
    q_buf = queue_buffer(q)
    queues.append(q_buf)
    p = Process(target=heappoppush, args=(dir, q_buf, i, count))
    procs.append(p)
    p.start()
  queues = tuple(queues)
  for i in range(count):
    heappush(heap, (queues[i].get(), i))
  while True:
    try:
      d, i= heappop(heap)
    except IndexError:
      queue.put(None)
      for p in procs:
        p.join()
      return
    else:
      if d is not None:
        heappush(heap,(queues[i].get(), i))
        if d != pre_element:
          pre_element = d
          queue.put(d)
def merge_file(dir):
  heap = file_heap( dir )
  os.system('rm -f '+dir+'.merge')
  fmerge = open(dir+'.merge', 'a')
  element = heap.poppush_uniq()
  fmerge.write(element+'\n')
  while element is not None:
    element = heap.poppush_uniq()
    fmerge.write(element+'\n')
class queue_buffer:
  def __init__(self, queue):
    self.q = queue
    self.rbuf = []
    self.wbuf = []
  def get(self):
    if len(self.rbuf) == 0:
      self.rbuf = self.q.get()
    r = self.rbuf[0]
    del self.rbuf[0]
    return r
  def put(self, d):
    self.wbuf.append(d)
    if d is None or len(self.wbuf) > 1024:
      self.q.put(self.wbuf)
      self.wbuf = []
def diff_file(file_old, file_new, file_diff, buf = 268435456):
  print 'buffer size', buf
  from file_split import split_sort_file
  os.system('rm -rf '+ os.path.splitext(file_old)[0] )
  os.system('rm -rf '+ os.path.splitext(file_new)[0] )
  t = datetime.now()
  split_sort_file(file_old,5,buf)
  split_sort_file(file_new,5,buf)
  print 'split elasped ', datetime.now() - t
  os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0])
  os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0])
  os.system('rm -f '+file_diff)
  t = datetime.now()
  zdiff = open(file_diff, 'a')
  old_q = Queue(1024)
  new_q = Queue(1024)
  old_queue = queue_buffer(old_q)
  new_queue = queue_buffer(new_q)
  h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3))
  h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3))
  h1.start(), h2.start()
  old = old_queue.get()
  new = new_queue.get()
  old_count, new_count = 0, 0
  while old is not None or new is not None:
    if old > new or old is None:
      zdiff.write('<'+new+'\n')
      new = new_queue.get()
      new_count +=1
    elif old  '+old+'\n')
      old = old_queue.get()
      old_count +=1
    else:
      old = old_queue.get()
      new = new_queue.get()
  print 'new_count:', new_count
  print 'old_count:', old_count
  print 'diff elasped ', datetime.now() - t
  h1.join(), h2.join()

希望本文所述对大家的Python程序设计有所帮助。

推荐阅读
  • Java 实现二维极点算法
    本文介绍了一种使用 Java 编程语言实现的二维极点算法。该算法用于从一组二维坐标中筛选出极点,适用于需要处理几何图形和空间数据的应用场景。文章不仅详细解释了算法的工作原理,还提供了完整的代码示例。 ... [详细]
  • 使用Python实现余弦相似度计算
    余弦相似度广泛应用于文本分类、图像识别等领域,用于衡量两个向量之间的相似程度。其值域在-1到1之间,数值越接近1表示两向量越相似,完全相同为1;相反方向时为-1;正交或不相关时为0。 ... [详细]
  • Python Django大学生心理健康管理系统开发(含源码、文档)
    本项目包含完整的源代码、设计文档、数据库结构以及详细的安装指南,旨在为计算机专业的学生提供一个全面的心理健康管理系统解决方案。 ... [详细]
  • 本文介绍了如何利用Python的高精度计算库mpmath实现π的100种不同计算方法。通过设置更高的精度和优化的数学函数,这些方法能够提供极其精确的结果。 ... [详细]
  • 本文探讨了为何相同的HTTP请求在两台不同操作系统(Windows与Ubuntu)的机器上会分别返回200 OK和429 Too Many Requests的状态码。我们将分析代码、环境差异及可能的影响因素。 ... [详细]
  • 离线安装Grafana Cloudera Manager插件并监控CDH集群
    本文详细介绍如何离线安装Cloudera Manager (CM) 插件,并通过Grafana监控CDH集群的健康状况和资源使用情况。该插件利用CM提供的API接口进行数据获取和展示。 ... [详细]
  • CSS高级技巧:动态高亮当前页面导航
    本文介绍了如何使用CSS实现网站导航栏中当前页面的高亮显示,提升用户体验。通过为每个页面的body元素添加特定ID,并结合导航项的类名,可以轻松实现这一功能。 ... [详细]
  • 本文详细介绍了如何下载并安装 Python,包括选择合适的版本、执行安装程序以及设置环境变量的步骤。此外,还提供了测试安装是否成功的简单方法。 ... [详细]
  • 随着生活节奏的加快和压力的增加,越来越多的人感到不快乐。本文探讨了现代社会中导致人们幸福感下降的各种因素,并提供了一些改善建议。 ... [详细]
  • 你根本不会用百度
    本文转载自第2大脑,详情可以扫描下方二维码关注该公众号摘要:教你正确使用百度。想必你的朋友圈这两天应该被《搜索引擎百度已死》这篇文章刷屏了吧࿰ ... [详细]
  • Python中HOG图像特征提取与应用
    本文介绍如何在Python中使用HOG(Histogram of Oriented Gradients)算法进行图像特征提取,探讨其在目标检测中的应用,并详细解释实现步骤。 ... [详细]
  • Python技巧:利用Cookie实现自动登录绕过验证码
    本文详细介绍了如何通过Python和Selenium库利用浏览器Cookie实现自动登录,从而绕过验证码验证。文章提供了具体的操作步骤,并附有代码示例,帮助读者理解和实践。 ... [详细]
  • Python包管理工具pip的使用指南
    本文详细介绍了如何使用pip进行Python包的安装、管理和常见问题的解决方法,特别针对国内用户提供了优化建议。 ... [详细]
  • Python + Pytest 接口自动化测试中 Token 关联登录的实现方法
    本文将深入探讨 Python 和 Pytest 在接口自动化测试中如何实现 Token 关联登录,内容详尽、逻辑清晰,旨在帮助读者掌握这一关键技能。 ... [详细]
  • Python 工具推荐 | PyHubWeekly 第二十一期:提升命令行体验的五大工具
    本期 PyHubWeekly 为大家精选了 GitHub 上五个优秀的 Python 工具,涵盖金融数据可视化、终端美化、国际化支持、图像增强和远程 Shell 环境配置。欢迎关注并参与项目。 ... [详细]
author-avatar
航模特异_831
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有