热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用pymongo来操作mongodb数据库

本文介绍mongodb的基本使用,常用操作.主要讲pymongo的使用,同时必要的时候会说一些源码的以及注意事项.涉及主要说了一些常见的问题,monggodb中经常用过的查询操作.

本文介绍 mongodb 的基本使用,常用操作.主要讲 pymongo 的使用, 同时必要的时候会说一些 源码的 以及注意事项.

涉及主要说了一些常见的问题, monggodb 中经常用过的查询操作.

  • and or 用法
  • 排序操作
  • 工具类
  • in 查询
  • skip ,offset 操作
  • cursor 介绍
  • - 遇到错误 相关错误

1 根据mongo_id 查询文档


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymogo.py
@Author  : [email protected]

按照 object_id  查询 document

"""

from pymongo import MongoClient
# mongo URI  连接配置 
from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

# 导入这个包
from bson.objectid import ObjectId

# 通过uri 连接 pymongo 生成 client 
client = MongoClient(SHOUFUYOU_REPORTING_URI)

# 获取 db 通过名称 获取db .
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]

# 获取collection
call_record = mongo_db['callRecord']


if __name__ == '__main__':
    # 查询条件, 相当于 where 后面跟的条件
    # filter_ = {'_id': ObjectId('5be2b43da90ec1470078ef53')}
    filter_ = {'_id': ObjectId('5be2b43da90ec1470078ef50')}

    # 过滤字段, 需要筛选出来 你想要的字段, 相当于 select 后面跟的 字段,
    #  格式 '字段名':1 显示, '字段名':0 不显示. 默认 是都显示出来, 如果指定了字段 则根据指定条件 ***显示.
    projection = {'source_type': 1, '_id': 1}

    # 根据mongo_id  查询数据, 如果没有返回 None
    document = call_record.find_one(filter=filter_, projection=projection)

    print(document)
    #结果  {'_id': ObjectId('5be2b43da90ec1470078ef53'), 'source_type': 'android'}

通过 URI 连接 到mongodb, 之后获取db, 最后 获取collection 就可以了. 之后 就可以 find取查询 数据库了.

get_database参考这个文档 http://api.mongodb.com/python/current/tutorial.html#getting-a-database

注意这里用的 是 find_one 这个方法 只是用来查询确定一条文档,才会使用. 一般 情况下 会使用 find 这个命令 会多一些.

举个简单的例子吧 .

cursor.find 的用法

find 的使用,在 mongodb 查询 find 用的是最多的.

find 返回 结果是一个cursor , 如果没有结果就会 None .

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymogo.py
@Author  : [email protected]

find  基本用法 in

"""

from pymongo import MongoClient
# mongo URI  连接配置
from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

# 导入这个包
from bson.objectid import ObjectId

client = MongoClient(SHOUFUYOU_REPORTING_URI)

# 获取 db
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]

# 获取collection
call_record = mongo_db['callRecord']

if __name__ == '__main__':

    mongo_id_list_test = [
        # 数据 mongo_id
        ObjectId("5be2b43da90ec1470078ef53"),
        ObjectId("5be3ec1da90ec146d71b551f"),
        ObjectId("5be422eba90ec106a54840b2")

    ]

    # mongodb  in 查询, 查询条件
    filter_ = {"_id": {"$in": mongo_id_list_test}}
    
    # 筛选字段 
    projection = {
        '_id': 1,
        'created_time': 1,
    }

    # cursor  注意 find 并不会返回文档, 而是返回一个cursor 对象
    documents = call_record.find(filter_, projection)

    print(f"documents:{documents}")

    # 需要迭代对象,才能取到值.
    for doc in documents:
        print(doc)

结果如下:

documents:
{'_id': ObjectId('5be2b43da90ec1470078ef53'), 'created_time': '2018-11-07 17:45:33'}
{'_id': ObjectId('5be3ec1da90ec146d71b551f'), 'created_time': '2018-11-08 15:56:13'}
{'_id': ObjectId('5be422eba90ec106a54840b2'), 'created_time': '2018-11-08 19:50:03'}

来说下 find 的参数. find 参数 还是挺多的.
这里只说几个比较重要的.

  • filter 第一个位置 参数 就是筛选条件
  • projection 第二个 位置参数 筛选字段
  • no_cursor_timeout 判断cursor 是否超时,默认是False ,永不超时
1 find 中and 的用法

and 语法 如下 :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymogo.py
@Author  : [email protected]

find  基本用法 and  用法 



use xinyongfei_rcs_gateway;

db.getCollection("fudataLog").find(
    {
        "$and" : [
            {
                "status" : "SUCCESS"
            },
            {
                "created_time" : {
                    "$gte" : "2018-05-22 17:18:45"
                }
            },
            {
                "created_time" : {
                    "$lt" : "2018-05-29 17:18:45"
                }
            },
            {
                "status" : "SUCCESS"
            }
        ]
    }
);




"""
from pymongo import MongoClient
# mongo URI  连接配置
from config.DB import XINYONGFEI_RCS_GATEWAY_URI, XINYONGFEI_RCS_GATEWAY_DB_NAME


def test_mongo_between():
    """
    { $and: [ { "created_time": { $gte: "2018-05-22 16:31:05" } },
        { "created_time": { $lt: "2018-05-25 16:31:05" } }, { "method_id": "commerceReportPull" } ]
    }
    :return:
    """

    _uri = XINYONGFEI_RCS_GATEWAY_URI
    _dbname = XINYONGFEI_RCS_GATEWAY_DB_NAME

    collecion_name = 'fudataLog'

    client = MongoClient(_uri)
    db = client[_dbname]
    collecion = db[collecion_name]

    # 查询 时间段 是在   '2018-05-22 16:31:05' <=create_time <'2018-05-30 16:31:05'
    # 并且  method_id = commerceReportPull , status = SUCCESS 的记录
    doamin = {
        "$and": [
            {"created_time": {"$lt": "2018-10-30 16:31:05"}},
            {"created_time": {"$gte": "2018-10-17 18:13:12"}},
            {"method_id": "commerceGetOpenId"},
            {"status": "SUCCESS"}
        ]
    }

    fields = {"return_data.open_id": 1,
              "created_time": 1,
              'method_id': 1,
              "status": 1,
              # 不显示 mongo_id
              '_id': 0
              }

    cursor = collecion.find(filter=doamin, projection=fields)

    # 查看有多少记录
    print(f"cursor.count():{cursor.count()}")

    # 需要迭代对象,才能取到值.
    for doc in cursor:
        print(doc)


if __name__ == '__main__':
    test_mongo_between()
    pass


结果如下:

cursor.count():14
{'created_time': '2018-10-17 18:13:12', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'wdw66blejdcb3qhppc0kyo1yqhb6th3vlod0tgl9'}}
{'created_time': '2018-10-17 18:13:12', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'wdw66blejdcb3qhppc0kyo1yqhb6th3vlod0tgl9'}}
{'created_time': '2018-10-17 18:13:12', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'wdw66blejdcb3qhppc0kyo1yqhb6th3vlod0tgl9'}}
{'created_time': '2018-10-18 14:18:42', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'v0fahoarvxwixtu64yxtdhxaxa0x0azhlrt0bhxd'}}
{'created_time': '2018-10-18 14:18:42', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'v0fahoarvxwixtu64yxtdhxaxa0x0azhlrt0bhxd'}}
{'created_time': '2018-10-18 14:18:42', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'v0fahoarvxwixtu64yxtdhxaxa0x0azhlrt0bhxd'}}
{'created_time': '2018-10-18 18:59:27', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': '929loss67cisw42f8ocvrgonsxwkl5clryvuihlx'}}
{'created_time': '2018-10-26 17:50:39', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'p5lmxzfgprnhpuvv3pkjlt8iv6wtc9wzevzywk4x'}}
{'created_time': '2018-10-26 17:50:39', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'p5lmxzfgprnhpuvv3pkjlt8iv6wtc9wzevzywk4x'}}
{'created_time': '2018-10-29 18:20:48', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'ijhyxce9he3dgsoadt9z377cxcqqwdto3abgiz4w'}}
{'created_time': '2018-10-29 18:20:48', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'ijhyxce9he3dgsoadt9z377cxcqqwdto3abgiz4w'}}
{'created_time': '2018-10-29 18:20:48', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'ijhyxce9he3dgsoadt9z377cxcqqwdto3abgiz4w'}}
{'created_time': '2018-10-29 18:44:18', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'g1zu3wgncengql9dis2u3ghfnqh3ghtjlob4o2mv'}}
{'created_time': '2018-10-29 18:44:18', 'method_id': 'commerceGetOpenId', 'status': 'SUCCESS', 'return_data': {'open_id': 'g1zu3wgncengql9dis2u3ghfnqh3ghtjlob4o2mv'}}

Process finished with exit code 0

注意 and 查询条件 的写法

 doamin = {
        "$and": [
            {"created_time": {"$lt": "2018-10-30 16:31:05"}},
            {"created_time": {"$gte": "2018-10-17 18:13:12"}},
            {"method_id": "commerceGetOpenId"},
            {"status": "SUCCESS"}
        ]
    }
2 find 中 or 的用法
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymogo.py
@Author  : [email protected]

find  基本用法 or   用法


"""
from pymongo import MongoClient
# mongo URI  连接配置
from config.DB import XINYONGFEI_RCS_GATEWAY_URI, XINYONGFEI_RCS_GATEWAY_DB_NAME


def test_mongo_or():
    """
    or 的使用
    doamin = {
        "$or": [
            {"user_id": "99063974"},
            {"user_id": "99063770"},
        ]
    }


    :return:
    """

    _uri = XINYONGFEI_RCS_GATEWAY_URI
    _dbname = XINYONGFEI_RCS_GATEWAY_DB_NAME

    collection_name = 'fudataLog'

    client = MongoClient(_uri)
    db = client[_dbname]
    collection = db[collection_name]

    doamin = {
        "$or": [
            {"user_id": "99063974"},
            {"user_id": "99063770"},
        ]
    }

    fields = {
        "user_id": 1,
        "status": 1,
        # 不显示 mongo_id
        '_id': 0
    }

    cursor = collection.find(filter=doamin, projection=fields)

    # 查看有多少记录
    print(f"cursor.count():{cursor.count()}")

    # 需要迭代对象,才能取到值.
    for doc in cursor:
        print(doc)


if __name__ == '__main__':
    test_mongo_or()
    pass

结果如下:

cursor.count():34
{'user_id': '99063770', 'status': 'SUCCESS'}
{'user_id': '99063770', 'status': 'SUCCESS'}
{'user_id': '99063770', 'status': 'ERROR'}
{'user_id': '99063770', 'status': 'ERROR'}
{'user_id': '99063974', 'status': 'SUCCESS'}
{'user_id': '99063974', 'status': 'ERROR'}
{'user_id': '99063974', 'status': 'SUCCESS'}
...

这样就可以了,可以看出 这样全部文档 被查找出来了.

主要是 or 的用法, 这里只是把 and 换成了or 其他不变.
这里查询 user_id 是99063974 or 99063770 的文档 .

    doamin = {
        "$or": [
            {"user_id": "99063974"},
            {"user_id": "99063770"},
        ]
    }

可以看出 结果已经 找出来了, 但是结果里面 可能有status 等于error的记录, 我们可不可以拿到全是成功 记录呢, 肯定是可以的. 取结果集中成功的记录. 只要在添加一个条件即可.
看下面的例子:

def test_mongo_or_and():
    """
    and 和 or 的使用
    doamin = {

        "status": "SUCCESS",
        "$or": [
            {"user_id": "99063974"},
            {"user_id": "99063770"},
        ]
    }

    :return:
    """

    _uri = XINYONGFEI_RCS_GATEWAY_URI
    _dbname = XINYONGFEI_RCS_GATEWAY_DB_NAME
    collection_name = 'fudataLog'

    client = MongoClient(_uri)
    db = client[_dbname]
    collection = db[collection_name]

    doamin = {

        "status": "SUCCESS",
        "$or": [
            {"user_id": "99063974"},
            {"user_id": "99063770"},
        ]
    }

    fields = {
        "user_id": 1,
        "status": 1,
        # 不显示 mongo_id
        '_id': 0
    }

    cursor = collection.find(filter=doamin, projection=fields)

    # 查看有多少记录
    print(f"cursor.count():{cursor.count()}")

    # 需要迭代对象,才能取到值.
    for doc in cursor:
        print(doc)

结果如下:

cursor.count():6
{'user_id': '99063770', 'status': 'SUCCESS'}
{'user_id': '99063770', 'status': 'SUCCESS'}
{'user_id': '99063770', 'status': 'SUCCESS'}
{'user_id': '99063974', 'status': 'SUCCESS'}
{'user_id': '99063974', 'status': 'SUCCESS'}
{'user_id': '99063974', 'status': 'SUCCESS'}

mongodb 常用的一些查询:


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymongo_condition.py
@Author  : [email protected]


条件查询 :



1 范围查询: 按照时间范围 查询, 按照 user_id  查询 某一范围的数据.

filter_ = {
    #  查询时间范围,并且 user_id='99063857' 并且时间返回为 下面之间的数据
    "$and": [
        {"created_time": {"$lte": '2018-12-07 15:25:43'}},
        {"created_time": {"$gt": '2018-09-01 16:00:30'}},
        {'user_id': "99063857"}

    ]

}


2 TODO 



"""

from pymongo import MongoClient

from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

client = MongoClient(SHOUFUYOU_REPORTING_URI)

# 获取 db
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]

# 获取collection
call_record = mongo_db['callRecord']

fields = {'_id': 0, 'created_time': 1, "user_id": 1}
filter_ = {
    #  查询时间范围,并且 user_id='99063857' 并且时间返回为 下面之间的数据
    "$and": [
        {"created_time": {"$lte": '2018-12-07 15:25:43'}},
        {"created_time": {"$gt": '2018-09-01 16:00:30'}},
        {'user_id': "99063857"}

    ]

}

cursor = call_record.find(filter=filter_, projection=fields).limit(5)

print(cursor.count())

for doc in cursor:
    print(doc)
3 find 中 in 操作
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymogo.py
@Author  : [email protected]

find  基本用法 in

# mongodb  in 查询
mongo_id_list_test = [
    # 数据 mongo_id
    ObjectId("5be2b43da90ec1470078ef53"),
    ObjectId("5be3ec1da90ec146d71b551f"),
    ObjectId("5be422eba90ec106a54840b2")

]
filter_ = {"_id": {"$in": mongo_id_list_test}}
"""

from pymongo import MongoClient
# mongo URI  连接配置
from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

# 导入这个包
from bson.objectid import ObjectId

client = MongoClient(SHOUFUYOU_REPORTING_URI)

# 获取 db
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]

# 获取collection
call_record = mongo_db['callRecord']

if __name__ == '__main__':

    mongo_id_list_test = [
        # 数据 mongo_id
        ObjectId("5be2b43da90ec1470078ef53"),
        ObjectId("5be3ec1da90ec146d71b551f"),
        ObjectId("5be422eba90ec106a54840b2")

    ]
    # mongodb  in 查询
    filter_ = {"_id": {"$in": mongo_id_list_test}}

    projection = {
        '_id': 1,
        'created_time': 1,
    }

    # cursor  注意 find 并不会返回文档, 而是返回一个cursor 对象
    documents = call_record.find(filter_, projection)

    print(f"documents:{documents}")

    # 需要迭代对象,才能取到值.
    for doc in documents:
        print(doc)
4 find 中的排序操作

pymongo.ASCENDING 升序
pymongo.DESCENDING 降序

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/3/23 21:39
@File    : test_pymongo_condition.py
@Author  : [email protected]


排序操作 :

pymongo.ASCENDING  升序
pymongo.DESCENDING  降序


for doc in collection.find().sort('field', pymongo.ASCENDING):
    print(doc)




for doc in collection.find().sort([
        ('field1', pymongo.ASCENDING),
        ('field2', pymongo.DESCENDING)]):
    print(doc)



"""
import pymongo
from pymongo import MongoClient

from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

client = MongoClient(SHOUFUYOU_REPORTING_URI)

# 获取 db
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]

# 获取collection
call_record = mongo_db['callRecord']


def test_sort(collection):
    fields = {'_id': 0, 'created_time': 1, "user_id": 1}
    filter_ = {
        #  查询时间范围,并且 user_id='99063857' 并且时间返回为 下面之间的数据
        "$and": [
            {"created_time": {"$lte": '2018-12-07 15:25:43'}},
            {"created_time": {"$gt": '2018-09-01 16:00:30'}},

        ]

    }

    # 按照 create_time 降序排序
    cursor = collection.find(filter=filter_, projection=fields).sort([
        ('created_time', pymongo.DESCENDING),

    ]).limit(10)

    print(cursor.count())

    for doc in cursor:
        print(doc)


def test_sort_multi(collection):
    fields = {'_id': 0, 'created_time': 1, "user_id": 1}
    filter_ = {
        #  查询时间范围,并且 user_id='99063857' 并且时间返回为 下面之间的数据
        "$and": [
            {"created_time": {"$lte": '2018-12-07 15:25:43'}},
            {"created_time": {"$gt": '2018-09-01 16:00:30'}},

        ]

    }

    # 按照 create_time 降序排序
    # 注意这里的排序 是有顺序的,这里是先按照usre_id 升序,之后在按照created_time 降序排序.
    cursor = collection.find(filter=filter_, projection=fields).sort([
        ('user_id', pymongo.ASCENDING),
        ('created_time', pymongo.DESCENDING),

    ]).limit(50)

    print(cursor.count())

    for doc in cursor:
        print(doc)


if __name__ == '__main__':
    # test_sort(call_record)
    test_sort_multi(call_record)
5 find 中 的skip 和limit 操作.

有时候 我们希望可以跳过几个文档, 限制文档的数量. 这个时候 就可以使用 skip 和 limit 来完成这样的操作 ,使用起来也非常方便.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/4/3 11:56
@File    : test_cursor_skip_limit .py
@Author  : [email protected]
"""

from pymongo import MongoClient

from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

client = MongoClient(SHOUFUYOU_REPORTING_URI, maxPoolSize=50)
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]
collection = mongo_db['contacts']

domain = {"event_id": "1000073"}
fields = {'_id': 1, 'created_time': 1, 'event_id': 1}


cursor = collection.find(domain, fields)


# copy 一份 cursor 对象.
cursor_copy = cursor.clone()

values = cursor.skip(3).limit(2)
print(f"count:{values.count()}")
for item in values:
    print(item)

print("--------copy cursor  top 10 document------")

for idx, doc in enumerate(cursor_copy[0:10]):
    print(idx, doc)

结果如下:

count:393
{'_id': ObjectId('5bc86b34a90ec16e6c44dcca'), 'event_id': '1000073', 'created_time': '2018-10-18 19:15:00'}
{'_id': ObjectId('5bc87a85a90ec16e6e222242'), 'event_id': '1000073', 'created_time': '2018-10-18 20:20:21'}
--------copy cursor  top 10 document------
0 {'_id': ObjectId('5bc5ab05a90ec16eb23ee498'), 'event_id': '1000073', 'created_time': '2018-10-16 17:10:29'}
1 {'_id': ObjectId('5bc69975a90ec16ea023e42d'), 'event_id': '1000073', 'created_time': '2018-10-17 10:07:49'}
2 {'_id': ObjectId('5bc712afa90ec16ea20ff19f'), 'event_id': '1000073', 'created_time': '2018-10-17 18:45:03'}
3 {'_id': ObjectId('5bc86b34a90ec16e6c44dcca'), 'event_id': '1000073', 'created_time': '2018-10-18 19:15:00'}
4 {'_id': ObjectId('5bc87a85a90ec16e6e222242'), 'event_id': '1000073', 'created_time': '2018-10-18 20:20:21'}
5 {'_id': ObjectId('5bd27f8ea90ec1277f7e91d1'), 'event_id': '1000073', 'created_time': '2018-10-26 10:44:30'}
6 {'_id': ObjectId('5bd6de89a90ec12779579b77'), 'event_id': '1000073', 'created_time': '2018-10-29 18:18:49'}
7 {'_id': ObjectId('5bd6e416a90ec1278e0a16e8'), 'event_id': '1000073', 'created_time': '2018-10-29 18:42:30'}
8 {'_id': ObjectId('5bd81a1ea90ec127806c7670'), 'event_id': '1000073', 'created_time': '2018-10-30 16:45:18'}
9 {'_id': ObjectId('5be015d8a90ec146e7432850'), 'event_id': '1000073', 'created_time': '2018-11-05 18:05:12'}


从以上的结果可以看出来,skip 3 , limit 2 . 就是下面idx 3 ,4的值.

上面 的写法 也可以这样写:

values = cursor.limit(2).skip(3)

为什么可以这样写呢? 感觉非常像链式编程了. 为什么可以这样随意控制呢?
其实 这里 limit 最后 返回的 也是cursor 对象, skip 返回的也是cursor 对象. 所以 这样 就可以 一直 .skip().limit().skip() 这种方式进行编程.

这两个方法 返回的都是自己 的对象, 也就是对应 代码:

看下 skip 代码, 首先 检查skip 类型,做了一些简单的判断, 之后把 skip 保存到 自己 私有变量里面. self.__skip

   def skip(self, skip):
        """Skips the first `skip` results of this cursor.

        Raises :exc:`TypeError` if `skip` is not an integer. Raises
        :exc:`ValueError` if `skip` is less than ``0``. Raises
        :exc:`~pymongo.errors.InvalidOperation` if this :class:`Cursor` has
        already been used. The last `skip` applied to this cursor takes
        precedence.

        :Parameters:
          - `skip`: the number of results to skip
        """
        if not isinstance(skip, integer_types):
            raise TypeError("skip must be an integer")
        if skip <0:
            raise ValueError("skip must be >= 0")
        self.__check_okay_to_chain()

        self.__skip = skip
        return self
        

limit 的方法实现 其实和skip 是差不多的.

    def limit(self, limit):
        """Limits the number of results to be returned by this cursor.

        Raises :exc:`TypeError` if `limit` is not an integer. Raises
        :exc:`~pymongo.errors.InvalidOperation` if this :class:`Cursor`
        has already been used. The last `limit` applied to this cursor
        takes precedence. A limit of ``0`` is equivalent to no limit.

        :Parameters:
          - `limit`: the number of results to return

        .. mongodoc:: limit
        """
        if not isinstance(limit, integer_types):
            raise TypeError("limit must be an integer")
        if self.__exhaust:
            raise InvalidOperation("Can't use limit and exhaust together.")
        self.__check_okay_to_chain()

        self.__empty = False
        self.__limit = limit
        return self
        
find 的返回结果 cursor 对象

cursor 对象 可以通过collection.find() 来返回一个 cursor 对象
cursor对象 可以实现了切片协议, 因此可以使用 切片操作.

cursor.count() 方法 可以查询 查询了多少 文档,返回文档总数.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
@Time    : 2019/4/3 11:56
@File    : test_cursor_getitem.py
@Author  : [email protected]
"""

from pymongo import MongoClient

from config.DB import SHOUFUYOU_REPORTING_URI, SHOUFUYOU_REPORTING_DB_NAME

client = MongoClient(SHOUFUYOU_REPORTING_URI, maxPoolSize=50)
mongo_db = client[SHOUFUYOU_REPORTING_DB_NAME]
collection = mongo_db['contacts']

domain = {"event_id": "1000073"}
fields = {'_id': 1, 'created_time': 1, 'event_id': 1}

# 切片操作.
values = collection.find(domain, fields)[2:5]

print(f"count:{values.count()}")

for item in values:
    print(item)

count:393
{'_id': ObjectId('5bc712afa90ec16ea20ff19f'), 'event_id': '1000073', 'created_time': '2018-10-17 18:45:03'}
{'_id': ObjectId('5bc86b34a90ec16e6c44dcca'), 'event_id': '1000073', 'created_time': '2018-10-18 19:15:00'}
{'_id': ObjectId('5bc87a85a90ec16e6e222242'), 'event_id': '1000073', 'created_time': '2018-10-18 20:20:21'}
关于cursor 对象我简单聊一下.

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

使用pymongo来操作mongodb数据库
使用pymongo来操作mongodb数据库

mongodb 读取 数据的工具类

实现 从 mongodb 中读取 数据, 通过配置 字段,以及筛选条件 来完成 参数的配置.

实现 read 方法 批量读取数据.

from pymongo import MongoClient
from config.DB import XINYONGFEI_RCS_GATEWAY_URI, XINYONGFEI_RCS_GATEWAY_DB_NAME
import logging

logger = logging.getLogger(__name__)



class MongoReader(BaseReader):
    def __init__(self, uri, db_name, collecion_name, domain, fields):
        """
        mongo reader    工具类
        :param url:  uri mongo 连接的URI
        :param db_name:  db名称
        :param collecion_name:  collection_name
        :param domain:   查询条件
        :param fields:  过滤字段  {"name":1,"_id":1}
        """
        super().__init__(url=uri)

        self._dbname = db_name
        self._collecion_name = collecion_name

        self.domain = domain
        self.fields = fields

        client = MongoClient(self.url)
        db = client[self._dbname]
        self.collecion = db[self._collecion_name]

        # 最大读取数量
        self.max_count = 30000000000000


    def read(self, start=0, step=1000):

        limit = step - start
        skip_number = start

        count = self.collecion.count_documents(filter=self.domain)
        logger.info(f"total count:{count}")
        while True:
            logger.info(f'limit:{limit},skip:{skip_number}, start:{skip_number-start},end:{skip_number+limit}')
            # cursor = self.collecion.find(self.domain, self.fields, no_cursor_timeout=True).limit(limit).skip(
            #     skip_number)

            cursor = self.collecion.find(self.domain, self.fields, no_cursor_timeout=True).limit(limit).skip(
                skip_number)

            # 查询数据量
            number = cursor.count(with_limit_and_skip=True)
            if number:
                yield [d for d in cursor]

            skip_number += number
            if number = self.max_count:
                logger.info("skip_number:{},self.max_count:{}.skip_number >= self.max_count,break".format(
                    skip_number,
                    self.max_count))
                # 把cursor 关掉
                cursor.close()
                break



if __name__ == '__main__':
    start_time = '2018-10-01 11:03:05'
    end_time = '2019-01-20 14:03:49'

    reader_cOnfig= {
        'uri': XINYONGFEI_RCS_GATEWAY_URI,
        'db_name': XINYONGFEI_RCS_GATEWAY_DB_NAME,
        'domain': {"$and": [{"created_time": {"$lt": end_time}}, {"created_time": {"$gte": start_time}},
                            {"method_id": "securityReport"}, {"status": "SUCCESS"}]},
        'fields': {"created_time": 1, "user_id": 1, "_id": 1},
        'collecion_name': 'moxieSecurityLog',
    }

    reader = MongoReader(**reader_config)

    for data in reader.read():
        print(data)
    print('frank')
    

错误总结:
1 CursorNotFound 错误, 报 cursor 没有找到

报错如下:
pymongo.errors.CursorNotFound: Cursor not found, cursor id: 387396591387

Exception in thread consumer_14:
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.6.4_2/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/rcsdecisionv2.py", line 843, in run
    result = self.parse(values)
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/rcsdecisionv2.py", line 872, in parse
    for posts in posts_list:
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/venv3/lib/python3.6/site-packages/pymongo/cursor.py", line 1132, in next
    if len(self.__data) or self._refresh():
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/venv3/lib/python3.6/site-packages/pymongo/cursor.py", line 1075, in _refresh
    self.__max_await_time_ms))
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/venv3/lib/python3.6/site-packages/pymongo/cursor.py", line 947, in __send_message
    helpers._check_command_response(doc['data'][0])
  File "/Users/frank/PycharmProjects/xinyongfei-bi-model/venv3/lib/python3.6/site-packages/pymongo/helpers.py", line 207, in _check_command_response
    raise CursorNotFound(errmsg, code, response)
pymongo.errors.CursorNotFound: Cursor not found, cursor id: 387396591387

问题分析:
cursor 超时了.

设置参数 no_cursor_timeout = True



解决方案 :
demos = db['demo'].find({},{"_id": 0},no_cursor_timeout = True)
for cursor in demos:
        do_something()
demo.close() # 关闭游标

官方文档:
官方文档 默认 是10min , 就会关闭 cursor , 这里 可以设置一个 永不超时的参数.

no_cursor_timeout (optional): if False (the default), any returned cursor is closed by the server after 10 minutes of inactivity. If set to True, the returned cursor will never time out on the server. Care should be taken to ensure that cursors with no_cursor_timeout turned on are properly closed.

参考资料:

https://www.jianshu.com/p/a8551bd17b5b

http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.find

参考文档 :

1 api cursor http://api.mongodb.com/python/current/api/pymongo/cursor.html
2 api count_documents http://api.mongodb.com/python/current/api/pymongo/collection.html#pymongo.collection.Collection.count_documents
3 api collecion.html http://api.mongodb.com/python/current/api/pymongo/collection.html
4 api 排序操作 http://api.mongodb.com/python/current/api/pymongo/cursor.html#pymongo.cursor.Cursor.sort
5 mongodb tutorial http://api.mongodb.com/python/current/tutorial.html

1 Python3中PyMongo的用法 https://zhuanlan.zhihu.com/p/29435868
2 Python3 中PyMongo 的用法 https://cloud.tencent.com/developer/article/1005552
3 菜鸟用Python操作MongoDB,看这一篇就够了 https://cloud.tencent.com/developer/article/1169645
4 PyMongo 库使用基础使用速成教程 https://www.jianshu.com/p/acc57241f9f0

分享快乐,留住感动. 2019-04-03 21:12:05 --frank

推荐阅读
  • Web动态服务器Python基本实现
    Web动态服务器Python基本实现 ... [详细]
  • 本文介绍了 Oracle SQL 中的集合运算、子查询、数据处理、表的创建与管理等内容。包括查询部门号为10和20的员工信息、使用集合运算、子查询的注意事项、数据插入与删除、表的创建与修改等。 ... [详细]
  • 本文详细介绍了如何在 Django 项目中使用 Admin 管理后台,包括创建超级用户、启动项目、管理数据模型和修改用户密码等步骤。 ... [详细]
  • 在1995年,Simon Plouffe 发现了一种特殊的求和方法来表示某些常数。两年后,Bailey 和 Borwein 在他们的论文中发表了这一发现,这种方法被命名为 Bailey-Borwein-Plouffe (BBP) 公式。该问题要求计算圆周率 π 的第 n 个十六进制数字。 ... [详细]
  • 本文介绍了SIP(Session Initiation Protocol,会话发起协议)的基本概念、功能、消息格式及其实现机制。SIP是一种在IP网络上用于建立、管理和终止多媒体通信会话的应用层协议。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • 本文详细介绍了在Windows系统中如何配置Nginx以实现高效的缓存加速功能,包括关键的配置文件设置和示例代码。 ... [详细]
  • 说明Python教程正在编写中,欢迎大家加微信sinbam提供意见、建议、纠错、催更。drymail是一个邮件发送库,封装了Python的smtplib ... [详细]
  • C语言是计算机科学和编程领域的基石,许多初学者在学习过程中会感到困惑。本文将详细介绍C语言的基本概念、关键语法和实用示例,帮助你快速上手C语言。 ... [详细]
  • 本文详细解析了Python中的条件语句,重点介绍了if-elif-else结构的使用方法和语法要点。通过丰富的示例和详细的解释,帮助读者深入理解这一核心编程概念,适合各水平的开发者参考学习。 ... [详细]
  • 本文介绍了如何利用ObjectMapper实现JSON与JavaBean之间的高效转换。ObjectMapper是Jackson库的核心组件,能够便捷地将Java对象序列化为JSON格式,并支持从JSON、XML以及文件等多种数据源反序列化为Java对象。此外,还探讨了在实际应用中如何优化转换性能,以提升系统整体效率。 ... [详细]
  • 我正在为我的Flask网络应用程序使用geopy库。我想将我从模态(html ... [详细]
  • 如何用Python在MongoDB中导入JSON文件? ... [详细]
  • 大部分情况下爬取的数据特别灵活,不一定只有指定的几个字段数据,这时候就需要将数据存储在非关系型数据库中了,MongoDB是由C语言编写的& ... [详细]
  • 很简单的一步操作,是在python中运行,而不是一个python文件,找 ... [详细]
author-avatar
good7758
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有