# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块 # pip install pymysql # /usr/local/mysql/support-files/mysql.server start # 一般流程 # 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束 import pymysql dbInfo = { 'host' : 'localhost', 'port' : 3306, 'user' : 'root', 'password' : 'Sooele0000%', 'db' : 'world' } #数据库ip地址 #端口 #用户名 #密码 #数据库名 sqls = ['select 1', 'select VERSION()'] result = [] class ConnDB(object): def __init__(self, dbInfo, sqls): self.host = dbInfo['host'] self.port = dbInfo['port'] self.user = dbInfo['user'] self.password = dbInfo['password'] self.db = dbInfo['db'] self.sqls = sqls self.run() def run(self): conn = pymysql.connect( host = self.host, port = self.port, user = self.user, password = self.password, db = self.db ) # 游标建立的时候就开启了一个隐形的事物 cur = conn.cursor() try: for command in self.sqls: cur.execute(command) result.append(cur.fetchone()) # 关闭游标 cur.close() conn.commit() except: conn.rollback() # 关闭数据库连接 conn.close() if __name__ == "__main__": db = ConnDB(dbInfo, sqls) print(result)
# Python 3.7连接到MySQL数据库的模块推荐使用PyMySQL模块 # pip install pymysql # /usr/local/mysql/support-files/mysql.server start # 一般流程 # 开始-创建connection-获取cursor-CRUD(查询并获取数据)-关闭cursor-关闭connection-结束 import pymysql dbInfo = { 'host' : 'localhost', 'port' : 3306, 'user' : 'root', 'password' : 'Sooele0000%', 'db' : 'world' } #数据库ip地址 #端口 #用户名 #密码 #数据库名 sqls = ['select 1', 'select VERSION()'] result = [] class ConnDB(object): def __init__(self, dbInfo, sqls): self.host = dbInfo['host'] self.port = dbInfo['port'] self.user = dbInfo['user'] self.password = dbInfo['password'] self.db = dbInfo['db'] self.sqls = sqls self.run() def run(self): conn = pymysql.connect( host = self.host, port = self.port, user = self.user, password = self.password, db = self.db ) # 游标建立的时候就开启了一个隐形的事物 cur = conn.cursor() try: for command in self.sqls: cur.execute(command) result.append(cur.fetchone()) # 关闭游标 cur.close() conn.commit() except: conn.rollback() # 关闭数据库连接 conn.close() if __name__ == "__main__": db = ConnDB(dbInfo, sqls) print(result)
# 执行批量插入 values = [(id,'testuser'+str(id)) for id in range(4, 21) ] cursor.executemany('INSERT INTO '+ TABLE_NAME +' values(%s,%s)' ,values) # 练习 # 1 pymysql是否是线程安全的?如何使用ThreadPoolExecutor与pymysql实现多线程访问数据库
##########表结构############
CREATE TABLE `数据库名`.`表名` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `title` varchar(200) DEFAULT NULL COMMENT '文章标题', PRIMARY KEY (`id`) );
######## settings.py ########## #mysql-config MYSQL_HOST = 'localhost' MYSQL_DBNAME = 'scrapy' MYSQL_USER = 'root' MYSQL_PASSWD ='rootroot' MYSQL_PORT = 3306 ######## pipelines.py ########## # -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://doc.scrapy.org/en/latest/topics/item-pipeline.html import json from logging import Logger from twisted.enterprise import adbapi from blogscrapy.db.dbhelper import DBHelper import codecs from logging import log from scrapy.utils.project import get_project_settings class BlogscrapyPipeline(object): def __init__(self): self.file = open('blog.json', 'a+', encoding='utf-8') self.db = DBHelper() def process_item(self, item, spider): content = json.dumps(dict(item), ensure_ascii=False) + "\n" self.file.write(content) self.db.insert(item) return item def close_spider(self, spider): self.file.close() ######## DBHelper.py ########## # -*- coding: utf-8 -*- import pymysql from twisted.enterprise import adbapi from scrapy.utils.project import get_project_settings class DBHelper(): def __init__(self): settings = get_project_settings() dbparams = dict( host=settings['MYSQL_HOST'], db=settings['MYSQL_DBNAME'], user=settings['MYSQL_USER'], passwd=settings['MYSQL_PASSWD'], charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, use_unicode=False, ) # 将字典扩展为关键字参数 dbpool = adbapi.ConnectionPool('pymysql', **dbparams) self.__dbpool = dbpool def connect(self): return self.__dbpool def insert(self, item): # 封装insert操作 sql = "insert into article(title) values(%s)" query = self.__dbpool.runInteraction(self._conditional_insert, sql, item) query.addErrback(self._handle_error) return item def _conditional_insert(self, canshu, sql, item): # 传items的数据 params = (item['title']) canshu.execute(sql, params) def _handle_error(self, failue): print(failue) def __del__(self): try: self.__dbpool.close() except Exception as ex: print(ex)
一、数据库增删改操作 commit()方法:在数据库里增、删、改的时候,必须要进行提交,否则插入的数据不生效。 executemany():用来同时插入多条数据: import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"LBLB1212@@", "database":"dbforpymysql" } db = pymysql.connect(**config) cursor = db.cursor() sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)" cursor.executemany(sql,[("tom","123"),("alex",'321')]) db.commit() #提交数据 cursor.close() db.close() execute()和executemany()都会返回受影响的行数: sql = "delete from userinfo where username=%s" res = cursor.executemany(sql,("jack",)) print("res=",res) #运行结果 res= 1 当表中有自增的主键的时候,可以使用lastrowid来获取最后一次自增的ID: import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"LBLB1212@@", "database":"dbforpymysql" } db = pymysql.connect(**config) cursor = db.cursor() sql = "INSERT INTO userinfo(username,passwd) VALUES(%s,%s)" cursor.execute(sql,("zed","123")) print("the last rowid is ",cursor.lastrowid) db.commit() #提交数据 cursor.close() db.close() #运行结果 the last rowid is 10
二、数据库的查询操作 fetchone():获取下一行数据,第一次为首行; fetchall():获取所有行数据源 fetchmany(4):获取下4行数据 import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"@@@@@", "database":"dbforpymysql" } db = pymysql.connect(**config) cursor = db.cursor() sql = "SELECT * FROM userinfo" cursor.execute(sql) res = cursor.fetchone() #第一次执行 print(res) res = cursor.fetchone() #第二次执行 print(res) cursor.close() db.close() import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"@@@@@", "database":"dbforpymysql" } db = pymysql.connect(**config) cursor = db.cursor() sql = "SELECT * FROM userinfo" cursor.execute(sql) res = cursor.fetchall() #第一次执行 print(res) res = cursor.fetchall() #第二次执行 print(res) cursor.close() db.close() 可以看到,第二次获取的时候,什么数据都没有获取到,这个类似于文件的读取操作。 默认情况下,我们获取到的返回值是元组,只能看到每行的数据,却不知道每一列代表的是什么,这个时候可以使用以下方式来返回字典,每一行的数据都会生成一个字典: cursor = db.cursor(cursor=pymysql.cursors.DictCursor) #在实例化的时候,将属性cursor设置为pymysql.cursors.DictCursor import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"LBLB1212@@", "database":"dbforpymysql" } db = pymysql.connect(**config) cursor = db.cursor(cursor=pymysql.cursors.DictCursor) sql = "SELECT * FROM userinfo" cursor.execute(sql) res = cursor.fetchall() print(res) cursor.close() db.close() 这样获取到的内容就能够容易被理解和使用了! 在获取行数据的时候,可以理解开始的时候,有一个行指针指着第一行的上方,获取一行,它就向下移动一行,所以当行指针到最后一行的时候,就不能再获取到行的内容,所以我们可以使用如下方法来移动行指针: cursor.scroll(1,mode='relative') # 相对当前位置移动 cursor.scroll(2,mode='absolute') # 相对绝对位置移动 第一个值为移动的行数,整数为向下移动,负数为向上移动,mode指定了是相对当前位置移动,还是相对于首行移动 sql = "SELECT * FROM userinfo" cursor.execute(sql) res = cursor.fetchall() print(res) cursor.scroll(0,mode='absolute') #相对首行移动了0,就是把行指针移动到了首行 res = cursor.fetchall() #第二次获取到的内容 print(res) #运行结果 [{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}] [{'id': 1, 'username': 'frank', 'passwd': '123'}, {'id': 2, 'username': 'rose', 'passwd': '321'}, {'id': 3, 'username': 'jeff', 'passwd': '666'}, {'id': 5, 'username': 'bob', 'passwd': '123'}, {'id': 8, 'username': 'jack', 'passwd': '123'}, {'id': 10, 'username': 'zed', 'passwd': '123'}]
在python的文件操作中支持上下文管理器,在操作数据库的时候也可以使用:
import pymysql config={ "host":"127.0.0.1", "user":"root", "password":"LBLB1212@@", "database":"dbforpymysql" } db = pymysql.connect(**config) with db.cursor(cursor=pymysql.cursors.DictCursor) as cursor: #获取数据库连接的对象 sql = "SELECT * FROM userinfo" cursor.execute(sql) res = cursor.fetchone() print(res) cursor.scroll(2,mode='relative') res = cursor.fetchone() print(res) cursor.close() db.close() #运行结果 {'id': 1, 'username': 'frank', 'passwd': '123'} {'id': 5, 'username': 'bob', 'passwd': '123'}