Page tree
Skip to end of metadata
Go to start of metadata
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import gevent.monkey
gevent.monkey.patch_all()
 
import msgpack
import redis
import gevent
from gevent.pool import Pool
from gevent.queue import Queue, Empty
 
item_queue = Queue()
pool = Pool(1000)
dmp = redis.StrictRedis(host="127.0.0.1", port=18014)
 
 
def save2redis(item_list):
    for item in item_list:
        pass
        #print "save2redis:", item["ei"]
 
 
def buffered_to_redis():
    item_buffer = []
    for item in item_queue:
        item_buffer.append(item)
        if len(item_buffer) >= 1000:
            save2redis(item_buffer)
            item_buffer = []
    if len(item_buffer) > 0:
        save2redis(item_buffer)
 
 
def process_row(row):
    device = row.get("ei")
    dmpdata = dmp.get(device)
    if dmpdata is None:
        row["dmp"] = None
    else:
        row["dmp"] = msgpack.loads(dmpdata)
    item_queue.put(row)
 
 
def process_row_queue():
    pass
 
 
def main():
    spa = gevent.spawn(buffered_to_redis)
 
    with open("ei.log") as fp:
        for line in fp:
            row = {"ei": line.strip()}
            pool.spawn(process_row, row)
            #print "in:", row["ei"]
    pool.join()
 
    item_queue.put(StopIteration)
    gevent.joinall([
       spa
    ])
 
 
 
if __name__ == "__main__":
    main()


  • No labels
Write a comment...