python模块 / python记录 · 2021年12月20日

python(pymysql 模块 链接数据库)

# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块
# pip install pymysql
#  /usr/local/mysql/support-files/mysql.server start
# 一般流程
# 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束
import pymysql

dbInfo = {
    'host' : 'localhost',
    'port' : 3306,
    'user' : 'root',
    'password' : 'Sooele0000%',
    'db' : 'world'
}
#数据库ip地址
#端口
#用户名
#密码
#数据库名
sqls = ['select 1', 'select VERSION()']

result = []

class ConnDB(object):
    def __init__(self, dbInfo, sqls):
        self.host = dbInfo['host']
        self.port = dbInfo['port']
        self.user = dbInfo['user']
        self.password = dbInfo['password']
        self.db = dbInfo['db']
        self.sqls = sqls

        self.run()

    def run(self):
        conn = pymysql.connect(
            host = self.host,
            port = self.port,
            user = self.user,
            password = self.password,
            db = self.db
        )
        # 游标建立的时候就开启了一个隐形的事物
        cur = conn.cursor()
        try:
            for command in self.sqls:
                cur.execute(command)
                result.append(cur.fetchone())
            # 关闭游标
            cur.close()
            conn.commit()
        except:
            conn.rollback()
        # 关闭数据库连接
        conn.close()

if __name__ == "__main__":
    db = ConnDB(dbInfo, sqls)
    print(result)

 

# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块
# pip install pymysql
#  /usr/local/mysql/support-files/mysql.server start
# 一般流程
# 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束
import pymysql

dbInfo = {
    'host' : 'localhost',
    'port' : 3306,
    'user' : 'root',
    'password' : 'Sooele0000%',
    'db' : 'world'
}
#数据库ip地址
#端口
#用户名
#密码
#数据库名
sqls = ['select 1', 'select VERSION()']

result = []

class ConnDB(object):
    def __init__(self, dbInfo, sqls):
        self.host = dbInfo['host']
        self.port = dbInfo['port']
        self.user = dbInfo['user']
        self.password = dbInfo['password']
        self.db = dbInfo['db']
        self.sqls = sqls

        self.run()

    def run(self):
        conn = pymysql.connect(
            host = self.host,
            port = self.port,
            user = self.user,
            password = self.password,
            db = self.db
        )
        # 游标建立的时候就开启了一个隐形的事物
        cur = conn.cursor()
        try:
            for command in self.sqls:
                cur.execute(command)
                result.append(cur.fetchone())
            # 关闭游标
            cur.close()
            conn.commit()
        except:
            conn.rollback()
        # 关闭数据库连接
        conn.close()

if __name__ == "__main__":
    db = ConnDB(dbInfo, sqls)
    print(result)
# 执行批量插入
values = [(id,'testuser'+str(id)) for id in range(4, 21) ]
cursor.executemany('INSERT INTO '+ TABLE_NAME +' values(%s,%s)' ,values)
# 练习
# 1 pymysql是否是线程安全的?如何使用ThreadPoolExecutor与pymysql实现多线程访问数据库

 

##########表结构############

CREATE TABLE `数据库名`.`表名`  (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `title` varchar(200) DEFAULT NULL COMMENT '文章标题',
  PRIMARY KEY (`id`)
);
######## settings.py ##########
#mysql-config
MYSQL_HOST = 'localhost'
MYSQL_DBNAME = 'scrapy'
MYSQL_USER = 'root'
MYSQL_PASSWD ='rootroot'
MYSQL_PORT = 3306



######## pipelines.py ##########
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html
import json
from logging import Logger
from twisted.enterprise import adbapi
from blogscrapy.db.dbhelper import DBHelper
import codecs
from logging import log
from scrapy.utils.project import get_project_settings
class BlogscrapyPipeline(object):
    def __init__(self):
        self.file = open('blog.json', 'a+', encoding='utf-8')
        self.db = DBHelper()
    def process_item(self, item, spider):
        content = json.dumps(dict(item), ensure_ascii=False) + "\n"
        self.file.write(content)
        self.db.insert(item)
        return item
    def close_spider(self, spider):
        self.file.close()



######## DBHelper.py ##########
# -*- coding: utf-8 -*-
import pymysql
from twisted.enterprise import adbapi
from scrapy.utils.project import get_project_settings
class DBHelper():
    def __init__(self):
        settings = get_project_settings()
        dbparams = dict(
            host=settings['MYSQL_HOST'],
            db=settings['MYSQL_DBNAME'],
            user=settings['MYSQL_USER'],
            passwd=settings['MYSQL_PASSWD'],
            charset='utf8mb4', 
            cursorclass=pymysql.cursors.DictCursor,
            use_unicode=False,
        )
        # 将字典扩展为关键字参数
        dbpool = adbapi.ConnectionPool('pymysql', **dbparams)
        self.__dbpool = dbpool

    def connect(self):
        return self.__dbpool

    def insert(self, item):
        # 封装insert操作
        sql = "insert into article(title) values(%s)"
        query = self.__dbpool.runInteraction(self._conditional_insert, sql, item)
        query.addErrback(self._handle_error)
        return item

    def _conditional_insert(self, canshu, sql, item):
        # 传items的数据
        params = (item['title'])
        canshu.execute(sql, params)
    def _handle_error(self, failue):
        print(failue)
    def __del__(self):
        try:
            self.__dbpool.close()
        except Exception as ex:
            print(ex)

 

 

 

 

 

一、数据库增删改操作
commit()方法:在数据库里增、删、改的时候,必须要进行提交,否则插入的数据不生效。
executemany():用来同时插入多条数据:

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"LBLB1212@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)"
cursor.executemany(sql,[("tom","123"),("alex",'321')])
db.commit()  #提交数据
cursor.close()
db.close()

execute()和executemany()都会返回受影响的行数:

sql = "delete  from  userinfo where username=%s"
res = cursor.executemany(sql,("jack",))
print("res=",res)
#运行结果
res= 1

当表中有自增的主键的时候,可以使用lastrowid来获取最后一次自增的ID:

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"LBLB1212@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)"
cursor.execute(sql,("zed","123"))
print("the last rowid is ",cursor.lastrowid)
db.commit()  #提交数据
cursor.close()
db.close()

#运行结果
the last rowid is  10

 

 

二、数据库的查询操作

fetchone():获取下一行数据,第一次为首行;
fetchall():获取所有行数据源
fetchmany(4):获取下4行数据

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"@@@@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchone() #第一次执行
print(res)
res = cursor.fetchone() #第二次执行
print(res)
cursor.close()
db.close()


import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"@@@@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor()
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall() #第一次执行
print(res)
res = cursor.fetchall()  #第二次执行
print(res)
cursor.close()
db.close()
可以看到,第二次获取的时候,什么数据都没有获取到,这个类似于文件的读取操作。

默认情况下,我们获取到的返回值是元组,只能看到每行的数据,却不知道每一列代表的是什么,这个时候可以使用以下方式来返回字典,每一行的数据都会生成一个字典:

cursor = db.cursor(cursor=pymysql.cursors.DictCursor)  #在实例化的时候,将属性cursor设置为pymysql.cursors.DictCursor

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"LBLB1212@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
cursor = db.cursor(cursor=pymysql.cursors.DictCursor)
sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall()
print(res)
cursor.close()
db.close()

这样获取到的内容就能够容易被理解和使用了!

在获取行数据的时候,可以理解开始的时候,有一个行指针指着第一行的上方,获取一行,它就向下移动一行,所以当行指针到最后一行的时候,就不能再获取到行的内容,所以我们可以使用如下方法来移动行指针:

cursor.scroll(1,mode='relative')  # 相对当前位置移动
cursor.scroll(2,mode='absolute') # 相对绝对位置移动
第一个值为移动的行数,整数为向下移动,负数为向上移动,mode指定了是相对当前位置移动,还是相对于首行移动

sql = "SELECT * FROM userinfo"
cursor.execute(sql)
res = cursor.fetchall()
print(res)
cursor.scroll(0,mode='absolute') #相对首行移动了0,就是把行指针移动到了首行
res = cursor.fetchall()  #第二次获取到的内容
print(res)

#运行结果
[{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}]
[{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}]



在python的文件操作中支持上下文管理器,在操作数据库的时候也可以使用:

import pymysql
config={
    "host":"127.0.0.1",
    "user":"root",
    "password":"LBLB1212@@",
    "database":"dbforpymysql"
}
db = pymysql.connect(**config)
with db.cursor(cursor=pymysql.cursors.DictCursor) as cursor:  #获取数据库连接的对象
    sql = "SELECT * FROM userinfo"   
    cursor.execute(sql)
    res = cursor.fetchone()
    print(res)
    cursor.scroll(2,mode='relative')
    res = cursor.fetchone()
    print(res)
    cursor.close()
db.close()

#运行结果
{'id': 1, 'username': 'frank', 'passwd': '123'}
{'id': 5, 'username': 'bob', 'passwd': '123'}