最近经常要将数据导入到mysql中,我写过一次后也是复制粘贴前面写过的,但老是经常忘记写过的放哪去了,索性整理下直接写到博客里面来

方法:

      1、使用 pymysql 库, 数据一条条插入,或者用Django ORM里面的方法,数据批量插入

      2、使用 pandas 库,一次性插入,也可批量插入

      3、使用 pyspark, 一次性插入(可以不用建表,但是表没有注释, 即 mysql 的 COMMENT,要注释的话可以建空表)

方法1:

mysql 首先创建表

CREATE TABLE `data_to_mysql` (
  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `name` VARCHAR(30) DEFAULT NULL COMMENT '姓名',
  `sex` VARCHAR(20) DEFAULT NULL COMMENT '性别',
  `age` int(5) DEFAULT NULL COMMENT '年龄',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='数据导入mysql';

 

import pymysql
import pandas as pd

mysql_host = 'localhost'
mysql_db = 'test'
mysql_user = 'root'
mysql_pwd = '123456'
mysql_table = 'data_to_mysql'

class MYSQL:
    def __init__(self):
        # MySQL
        self.MYSQL_HOST = mysql_host
        self.MYSQL_DB = mysql_db
        self.MYSQ_USER = mysql_user
        self.MYSQL_PWD = mysql_pwd
        self.connect = pymysql.connect(
            host=self.MYSQL_HOST,
            db=self.MYSQL_DB,
            port=3306,
            user=self.MYSQ_USER,
            passwd=self.MYSQL_PWD,
            charset='utf8',
            use_unicode=False
        )
        print(self.connect)
        self.cursor = self.connect.cursor()


    def insert_mysql(self, data_json):
        """
        数据插入mysql
        :param data_json:
        :return:
        """
        sql = "insert into {}(`name`, `sex`, `age`) VALUES (%s, %s, %s)".format(mysql_table)
        try:
            self.cursor.execute(sql, (data_json['name'], data_json['sex'], data_json['age']))
            self.connect.commit()
            print('数据插入成功')
        except Exception as e:
            print('e= ', e)
            print('数据插入错误')

def main():
    mysql = MYSQL()
    df = pd.DataFrame({
        'name': ['戴沐白','奥斯卡','唐三','小舞','马红俊','宁荣荣','朱竹清'],
        'sex': ['男', '男', '男', '女', '男', '女', '女'],
        'age': [23, 22, 21, 100000, 20, 20 ,20]
    })
    # orient='records', 表示将DataFrame的数据转换成我想要的json格式
    data_json = df.to_dict(orient='records')
    for dt in data_json:
        mysql.insert_mysql(dt)


if __name__ == '__main__':
    main()

 

方法2

mysql 首先创建表

CREATE TABLE `data_to_mysql` (
  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `name` VARCHAR(30) DEFAULT NULL COMMENT '姓名',
  `sex` VARCHAR(20) DEFAULT NULL COMMENT '性别',
  `age` int(5) DEFAULT NULL COMMENT '年龄',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='数据导入mysql';

import pandas as pd
from sqlalchemy import create_engine

mysql_host = 'localhost'
mysql_db = 'test'
mysql_user = 'root'
mysql_pwd = '123456'
mysql_table = 'data_to_mysql'



def main():
    engine = create_engine('mysql+pymysql://{}:{}@{}:3306/{}?charset=utf8'.format(mysql_user, mysql_pwd, mysql_host, mysql_db))
    df = pd.DataFrame({
        'name': ['戴沐白','奥斯卡','唐三','小舞','马红俊','宁荣荣','朱竹清'],
        'sex': ['男', '男', '男', '女', '男', '女', '女'],
        'age': [23, 22, 21, 100000, 20, 20 ,20]
    })
             #  表名
    df.to_sql(mysql_table, con=engine, if_exists='append', index=False)
    """
        to_sql参数:(比较重要)
            if_exists:表如果存在怎么处理
                    append:追加
                    replace:删除原表,建立新表再添加
                    fail:什么都不干
             chunksize: 默认的话是一次性导入, 给值的话是批量导入,一批次导入多少
             index=False:不插入索引index
             dtype 创建表结构
               需要导入 import sqlalchemy
               dtype = {'id': sqlalchemy.types.BigInteger(),
                 'name': sqlalchemy.types.String(length=20),
                 'sex': sqlalchemy.types.String(length=20),
                 'age': sqlalchemy.types.BigInteger(),
                 })
             
    """
if __name__ == '__main__':
    main()

 

方法3

spark不能直接读Excel等数据,可以先将数据写入到json文件中

import pandas as pd
import json
def data_to_json():
    """
    数据写入json
    :return:
    """
    df = pd.DataFrame({
        'name': ['戴沐白', '奥斯卡', '唐三', '小舞', '马红俊', '宁荣荣', '朱竹清'],
        'sex': ['男', '男', '男', '女', '男', '女', '女'],
        'age': [23, 22, 21, 100000, 20, 20, 20]
    })
    data_json = df.to_dict(orient='records')
    for dt in data_json:
        dt_string = json.dumps(dt, ensure_ascii=False)
        with open('data.json', 'a+', encoding='utf-8') as f:
            f.write('{}\n'.format(dt_string))


if __name__ == '__main__':
    data_to_json()

 

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

mysql_host = 'localhost'
mysql_db = 'test'
mysql_user = 'root'
mysql_pwd = '123456'
mysql_table = 'data_to_mysql'

def sparkConfig():
    """
    spark的配置
    :return:
    """
    spark = SparkSession\
        .builder\
        .appName('data_to_mysql')\
        .master('local[*]') \
        .getOrCreate()

    sql = SQLContext(spark)
    return sql, spark

def readJson(spark):
    data = spark.read.json('data.json')
    data.show()
    data.printSchema()
    print(dir(F))

    # 两种方法都可以更改列名
    # data.select(F.col('old_field1').alias('new_field1'), F.col('new_field2').alias('new_field2')).show()

    # data = data.withColumnRenamed('old_field', 'new_field')
    data.show()

    url = 'jdbc:mysql://{}:3306/{}?useUnicode=true&characterEncoding=utf8'.format(mysql_host, mysql_db)  # 有中文,需处理中文乱码
    table = mysql_table
    auth_mysql = {"user": mysql_user, "password": mysql_pwd}


    data.write.jdbc(url, table, mode='append', properties=auth_mysql)
    """
        * `append`: 数据追加
        * `overwrite`: 如果数据存在则覆盖
        * `error`: 如果数据已经存在,则抛出异常。
        * `ignore`: 如果数据已经存在,则忽略此操作。
    """


def main():
    sql, spark = sparkConfig()
    readJson(spark)

if __name__ == '__main__':
    main()

 

Logo

更多推荐