继续折腾之超大文件读取入库

最近很头大,

1、免费的SSR不稳定 前几天好好的 这几天不知道抽什么风

2、有200GB json文件 检索的时候根本打不开,只能入库查询了,适合存储的 redis mongodb  什么分布式啦 这些都不会

6.jpg

redis 据说 写入80000/s 还阔以

完全不知道从哪下手 硬着头皮干吧

分块多线程读取 

import threading
import time
import os
import threading
import pymongo
import json
class fileSplit:
    def __init__(self,filePath,threadNum):
        self.filePath = filePath
        self.blockNum = threadNum
        self.totalSize = os.path.getsize(filePath)
        self.blockSize = self.totalSize / self.blockNum

        print("文件总大小"+str(self.totalSize)+" Bytes")
        print("文件块数"+str(self.blockNum)+" 个")
        print("每块大小"+str(self.blockSize)+" Bytes")

    def subfile(self):
        #fd = open(self.filePath)
        posList = []
        startpof = 0
        for i in range(self.blockNum):
            '''
            #最后一块
            if i == self.blockNum - 1:
                endpof = self.totalSize - 1
                posList.append((startpof,endpof))
                break
            endpof = startpof + self.blockSize -1
            if endpof >= self.totalSize:
                endpof = self.totalSize -1
            if startpof >= self.totalSize:
                break
            posList.append((startpof, endpof))
            startpof = endpof+1
            '''
            # 最后一块
            if i+1 == self.blockNum:
                endpof = self.totalSize-1
                posList.append((startpof, endpof))
                break
            endpof = startpof + self.blockSize + 1

            if endpof>=self.totalSize:
                endpof = self.totalSize -1
            if startpof >= self.totalSize:
                break
            posList.append((startpof, endpof))
            startpof = endpof + 1

        return  posList

class Reader(threading.Thread):
    def __init__(self,filePath,startpos,endpos):
        super(Reader, self).__init__()
        self.filePath = filePath
        self.startpos = startpos
        self.endpos = endpos
        self.buf = []
    def run(self):

        fd = open(self.filePath, "r")
        '''
        该if块主要判断分块后的文件块的首位置是不是行首,
        是行首的话,不做处理
        否则,将文件块的首位置定位到下一行的行首
        '''

        if self.startpos != 0:
            fd.seek(self.startpos - 1)
            if fd.read(1) != '\n':
                line = fd.readline()
                self.startpos = fd.tell()
        fd.seek(self.startpos)

        '''
        对该文件块进行处理
        '''
        while (self.startpos <= self.endpos):
                line = fd.readline()
                jsonp = json.loads(line)
                time.sleep(1)
                print(line)

                #print(type(jsonp))
                #currColletion.insert_one(jsonp)
                #time.sleep(10)
                self.startpos = fd.tell()
                #if len(self.buf)>=10:
                    #self.buf = []
                #self.buf.append(jsonp)
                #print(len(self.buf))


if __name__ == '__main__':
    try:
        filename = "2019-08-30-1567123524-fdns_mx.json"
        #mongo对象
        mgclient = pymongo.MongoClient("mongodb://localhost:27017/")

        collection = mgclient['fuck86']

        #if filename in collection.list_collection_names():
            #print(filename + "已存在")

        #当前集合
        currColletion = collection[filename]

        filePath = "E:/"+filename

        threadNum = 100

        fp = fileSplit(filePath, threadNum)
        posList = fp.subfile()

        # 起始时间
        startTime = time.clock()
        t = []
        # 生成线程
        for i in range(threadNum):
            t.append(Reader(filePath, *posList[i]))
        # 开启线程
        for i in range(threadNum):
            t[i].start()
        for i in range(threadNum):
            t[i].join()
        # 结束时间
        endTime = time.clock()

        print("Cost time is %f" % (endTime - startTime))

    except Exception as e:
        print(e)

这里用了将文件分块,每个线程读一块内容 需要精确文件的起始位置 只能一行一行插 插了一小时才千百万的样子 我这有20亿 猴年马月能插完

jj6.png

既然这样不大好,,, 看了下mongodb 批量插会比单条插效率高的很多

批量插

import linecache
import time
import pymongo
import json
filename = "2019-08-30-1567123524-fdns_mx.json"
filepath = "E:/"+filename

#mongo对象
mgclient = pymongo.MongoClient("mongodb://localhost:27017/")

collection = mgclient['fuck86']

#if filename in collection.list_collection_names():
    #print(filename + "已存在")

#当前集合
currColletion = collection[filename]

buf = []
with open(filepath,"rb") as fp:
    for line in fp:
        jsonstr = line.decode().strip("\n")
        jsonp = json.loads(jsonstr)
        #print(type(jsonp))
        if len(buf) >= 20000:
            result = currColletion.insert_many(buf)
            print(result.inserted_ids)
            time.sleep(1)
            buf = []
        buf.append(jsonp)
        #print(buf)


这里只能用rb读取效率更高,批量插用到的 insert_many

这里顺便也测试下 一秒中插数据的上限 插不完 好像mongodb 一直在队列中等 所以这里给个1m延时 这里就么有用多线程了 反正也摸不懂多线程 干脆慢慢跑去吧

34.png

 

emmm。。。这样一秒2w慢慢插吧 。。。继续折腾

本博客所有文章如无特别注明均为原创。作者:odaycaogen复制或转载请以超链接形式注明转自 123``blog
原文地址《继续折腾之超大文件读取入库

相关推荐

发表评论

路人甲 表情
Ctrl+Enter快速提交

网友评论(0)