aiomysql 是一个支持异步访问mysql的python库,它依赖并重用PyMySQL的大部分部分代码,tornado 结合aiomysql 能够异步且高效的对mysql进行访问操作。
安装方式
pip install aiomysql
异步代码的编写对于初学者而言,只需要注意使用await关键字就可以了,其他的与同步代码几乎相同,这也是async/await 关键字的优点,它让异步代码看起来与同步代码相似。
为了能在tornado 服务里更好的使用aiomysql, 我封装了一个AioMysqlClient 类,用来实现对mysql数据库的访问功能
db.py
import aiomysql.cursors
import asyncio
import traceback
from tornado.ioloop import IOLoop
class AioMysqlClient():
    def __init__(self, host, port, username, password, db_name, **kwargs):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.db_name = db_name
        self.kwargs = kwargs            # 其他参数
        self.conn_pool = None           # 连接池
        self.is_connected = False       # 是否处于连接状态
        self.lock = asyncio.Lock()      # 异步锁
    async def init_pool(self):
        """
        创建数据库连接
        :return:
        """
        print("init_pool")
        try:
            self.conn_pool = await aiomysql.create_pool(
                host=self.host,
                port=self.port,
                user=self.username,
                password=self.password,
                db=self.db_name,
                **self.kwargs
            )
            self.is_connected = True
        except:
            print(traceback.format_exc())
            self.is_connected = False
        return self
    async def insert(self, sql, args= None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        await conn.commit()
        conn.close()            # 不是关闭连接,而是还到连接池中
        return cur
    async def fetch_one(self, sql, args=None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        if cur.rowcount == 0:
            return None
        return await cur.fetchone()
    async def fetch_all(self, sql, args=None):
        conn = await self.conn_pool.acquire()
        cur = await self.execute(conn, sql, args)
        if cur.rowcount == 0:
            return None
        return await cur.fetchall()
    async def execute(self, conn, sql, args=None):
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute(sql, args)            # 执行sql
            return cur
async def test_insert():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    sql = "insert into student(name, age)values('小明', 14)"
    print("执行sql")
    cur = await mysql_client.insert(sql)
    print(cur.lastrowid)
async def test_select():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    sql = "select * from student"
    data = await mysql_client.fetch_one(sql)
    print(data)
    datas = await mysql_client.fetch_all(sql)
    print(datas)
if __name__ == "__main__":
    asyncio.run(test_select())
AioMysqlClient 在内部维护了一个连接池,使用的时候,通过acquire方法获取一个空闲的连接,连接使用结束后,通过close方法归还给连接池。
app.py
import json
import asyncio
import tornado.ioloop
from tornado.web import RequestHandler, Application
from tornado.httpserver import HTTPServer
from tornado.options import options, define
from tornado.ioloop import IOLoop
from db import AioMysqlClient
define('port', default=8000, help='监听端口')
async def connect_mysql():
    mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
    await mysql_client.init_pool()
    return mysql_client
class BaseHnadler(RequestHandler):
    def initialize(self):
        self.mysql_client = self.settings['mysql']
class StudentsHandler(BaseHnadler):
    async def post(self):
        data = json.loads(self.request.body)
        sql = f"insert into student(name, age)values('{data['name']}', {data['age']})"
        cur = await self.mysql_client.insert(sql)
        return self.write(json.dumps({'id': cur.lastrowid}))
class StudentInfoHandler(BaseHnadler):
    async def get(self, id):
        sql = f"select * from student where id = {id}"
        data = await self.mysql_client.fetch_one(sql)
        return self.write(json.dumps(data))
def make_app(config):
    options.parse_command_line()
    handlers_routes = [
        (r'/students', StudentsHandler),
        (r'/students/(.*)', StudentInfoHandler)
    ]
    app = Application(handlers=handlers_routes, **config)
    return app
def main():
    config = {
        'mysql': IOLoop.current().run_sync(connect_mysql)
    }
    app = make_app(config)
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('ok')
    tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
    main()
RequestHandler 可以共用一个数据库连接池,初始化操作放在connect_mysql函数中进行,通过IOLoop.current().run_sync 方法运行,如果connect_mysql是异步的,那么run_sync 会一直运行直到有结果,必须这样处理,才能在server启动前完成mysql连接的初始化操作。
通过向Application 初始化函数传入config, 在RequestHandler 中就可以通过self.settings['mysql'] 获得AioMysqlClient 实例对象。
奇怪的是,以上代码在windows系统下无法正常运行,只能在linux系统下正确执行,问题就出在run_sync 上,总是报错
RuntimeError: no running event loop
关于这个问题,慢慢研究吧,等有了结果,我会更新这篇教程。
BaseHnadler 是所有处理请求类的父类,在这里可以编写所有子类都会用到的代码,RequestHandler 在被创建以后,会调用initialize 做初始化操作,在这里我定义了mysql_client, 后续的调用会更加方便。
现在,服务已经可以正常启动了,接下来,要多一些测试来验证我们的代码是正确的
import requests
def test_add():
    data = {
        'name': '小刚',
        'age': 15
    }
    res = requests.post('http://127.0.0.1:8000/students', json=data)
    print(res.json())
if __name__ == '__main__':
    test_add()
这段代码用来测试mysql新增,测试查询请求,可以直接通过浏览器访问
http://127.0.0.1:8000/students/1
 
            扫描关注, 与我技术互动
QQ交流群: 211426309
 
                        分享日常研究的python技术和遇到的问题及解决方案