#!/usr/bin/env python # -*- coding: utf-8 -*- import gevent.monkey gevent.monkey.patch_all() import msgpack import redis import gevent from gevent.pool import Pool from gevent.queue import Queue, Empty item_queue = Queue() pool = Pool(1000) dmp = redis.StrictRedis(host="127.0.0.1", port=18014) def save2redis(item_list): for item in item_list: pass #print "save2redis:", item["ei"] def buffered_to_redis(): item_buffer = [] for item in item_queue: item_buffer.append(item) if len(item_buffer) >= 1000: save2redis(item_buffer) item_buffer = [] if len(item_buffer) > 0: save2redis(item_buffer) def process_row(row): device = row.get("ei") dmpdata = dmp.get(device) if dmpdata is None: row["dmp"] = None else: row["dmp"] = msgpack.loads(dmpdata) item_queue.put(row) def process_row_queue(): pass def main(): spa = gevent.spawn(buffered_to_redis) with open("ei.log") as fp: for line in fp: row = {"ei": line.strip()} pool.spawn(process_row, row) #print "in:", row["ei"] pool.join() item_queue.put(StopIteration) gevent.joinall([ spa ]) if __name__ == "__main__": main()
Add Comment