python使用flask实现leveldb的rest api

     马上就要放假了,因为临过年了,也没啥事干… …   公司有个数据抽取的模块,是需要一个临时的又高性能的本地KV数据库, leveldb算是个好选择。 以前在人人的时候,我的好多项目都用了leveldb,只是后期改用ssdb的多点。 leveldb在百万数据后,速度明显不行… …  对于现在抽取来说来说,还是够用的了。 

     

     关于leveldb的安装和介绍我就不多说了,没啥意思…  …  我这边主要是介绍python leveldb库的使用。。。 。。。 


import leveldb
db = leveldb.LevelDB('./db')
db.Put('hello', 'world')
print db.Get('hello')

db.Delete('hello')
db.Get('hello')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
KeyError
for i in xrange(10):
  db.Put(str(i), 'string_%s' % i)

print list(db.RangeIter(key_from = '2', key_to = '5'))
[('2', 'string_2'), ('3', 'string_3'), ('4', 'string_4'), ('5', 'string_5')]
batch = leveldb.WriteBatch()
for i in xrange(1000):
   db.Put(str(i), 'string_%s' % i)

db.Write(batch, sync = True)

然后我们可以用FLask封装下leveldb api,做个rest api 。 Flask 默认是单进程单线程的,很容易造成io堵塞的,所以这里用gevent协程来做wsgi  

import argparse
import leveldb
import ujson as json
from flask import request, g

from flatdb import flatdb_app


JSON = {'Content-Type': 'application/json'}


def ensure_db():
    if 'db' not in g:
        g.db = leveldb.LevelDB(flatdb_app.config['DB'])


def put():
    ensure_db()
    keys = request.args.items(multi=True)
    batch = leveldb.WriteBatch()
    for k, v in keys:
        batch.Put(k, v)
    g.db.Write(batch)
    return '', 201, JSON


def get():
    ensure_db()
    keys = request.args.getlist('key')
    if not keys:
        return '', 204, JSON
    response = {}
    for k in keys:
        try:
            response[k] = g.db.Get(k)
        except KeyError:
            pass
    if not response:
        return '', 404, JSON
    return json.dumps(response), 200, JSON


def getrange():
    ensure_db()
    from_key = request.args.get('from')
    response = {}
    vals = g.db.RangeIter(key_from=from_key)
    for k, v in vals:
        response[k] = v
    if not response:
        return '', 404, JSON
    return json.dumps(response), 200, JSON


def delete():
    ensure_db()
    keys = request.args.getlist('key')
    batch = leveldb.WriteBatch()
    for k in keys:
        batch.Delete(k)
    g.db.Write(batch)
    return '', 200, JSON


def define_urls(app):
    app.add_url_rule('/put', view_func=put, methods=['GET'])
    app.add_url_rule('/get', view_func=get, methods=['GET'])
    app.add_url_rule('/getrange', view_func=getrange, methods=['GET'])
    app.add_url_rule('/delete', view_func=delete, methods=['GET'])


def get_options():
    parser = argparse.ArgumentParser()
    parser.add_argument('-d', '--debug', action='store_true', default=False)
    parser.add_argument('-p', '--port', type=int, default=7532)
    parser.add_argument('-b', '--database')
    parser.add_argument('-H', '--host', default='127.0.0.1')
    return parser.parse_args()


def configure_app(app, options):
    app.config['DB'] = options.database
    define_urls(app)


def dev_server():
    options = get_options()
    configure_app(flatdb_app, options)
    flatdb_app.run(debug=options.debug, port=options.port, host=options.host)


def run_server():
    options = get_options()
    configure_app(flatdb_app, options)

    from gevent.wsgi import WSGIServer

    server = WSGIServer((options.host, options.port), flatdb_app)
    server.serve_forever()


if __name__ == '__main__':
    run_server()

祝大家新年快乐….    原本是打算这两天写个2015的计划书,结果…..  累了,就这么招吧。  


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc

2 Responses

  1. toontong@163.com 2015年3月30日 / 下午2:42

    levelDB是嵌入式的,使用的是本地IO, gevent只能处理网络io为异步,本地io一样会引起阻塞。

    • 峰云就她了 2015年3月30日 / 下午6:36

      恩,对的 还是会堵塞的. 用gevent做http api,是为了解决flask性能的。 leveldb本身是很快,但是flask在一个量级下会堵塞的。

发表评论

电子邮件地址不会被公开。 必填项已用*标注