详解MySqlBulkLoader的使用

mysql数据库:最近要写一个服务,跨库数据同步,目前数据量大约一万,以后会越来越多,考虑到扩展性,数据的插入操作就采用了mysqlbulkloader 。本文分两部分来写,第一部分写一下mysqlbulkloader的使用,第二部分记录使用过程中出现的问题 。
一、mysqlbulkloader的使用 我们先来定义个数据表student,表结构如下:
创建一个core控制台项目,相关代码如下:
入口代码:
【详解MySqlBulkLoader的使用】using system;using system.collections.generic;namespace mysqlbulkloaderdemo{class program{static void main(string[] args){//装载30个数据list<student> stulist = new list<student>();for (int i = 0; i < 30; i++){stulist.add(new student{guid = guid.newguid().tostring(),name = "qxh",age = new random().next(1, 30)});}//调用mysqlbulkloader,往student表中插入stulistint insertcount = mysqlbulkloaderhelper.bulkinsert<student>(stulist, "student");console.writeline($"成功插入{insertcount}条数据");console.readkey();}}}定义一个student映射类:
using system;using system.collections.generic;using system.text;namespace mysqlbulkloaderdemo{public class student{public string guid { get; set; }public string name { get; set; }public int age { get; set; }}}定义一个mysqlbulkloaderhelper类,用于存放相关方法:
using mysql.data.mysqlclient;using system;using system.collections.generic;using system.componentmodel.dataannotations.schema;using system.data;using system.io;using system.linq;using system.text;namespace mysqlbulkloaderdemo{public class mysqlbulkloaderhelper{const string connectionstring = "server=localhost;port=3306;user=root;password=123456;database=mysql;sslmode = none;allowloadlocalinfile=true";public static int bulkinsert<t>(list<t> entities, string tablename){datatable dt = entities.todatatable();using (mysqlconnection conn = new mysqlconnection()){conn.connectionstring = connectionstring;if (conn.state != connectionstate.open){conn.open();}if (tablename.isnullorempty()){var tableattribute = typeof(t).getcustomattributes(typeof(tableattribute), true).firstordefault();if (tableattribute != null)tablename = ((tableattribute)tableattribute).name;elsetablename = typeof(t).name;}int insertcount = 0;string tmppath = path.combine(path.gettemppath(), datetime.now.ticks.tostring() + "_" + guid.newguid().tostring() + ".tmp");string csv = dt.tocsvstr();file.writealltext(tmppath, csv, encoding.utf8);using (mysqltransaction tran = conn.begintransaction()){mysqlbulkloader bulk = new mysqlbulkloader(conn){fieldterminator = ",",fieldquotationcharacter = '"',escapecharacter = '"',lineterminator = "\r\n",filename = tmppath,local = true,numberoflinestoskip = 0,tablename = tablename,characterset = "utf8"};try{bulk.columns.addrange(dt.columns.cast<datacolumn>().select(colum => colum.columnname).tolist());insertcount = bulk.load();tran.commit();}catch (mysqlexception ex){if (tran != null)tran.rollback();throw ex;}}file.delete(tmppath);return insertcount;}}}}定义一个帮助类extentionhelper,主要是扩展方法:
using newtonsoft.json;using system;using system.collections.generic;using system.data;using system.text;namespace mysqlbulkloaderdemo{public static class extentionhelper{/// <summary>/// 将对象序列化成json字符串/// </summary>/// <param name="obj">需要序列化的对象</param>/// <returns></returns>public static string tojson(this object obj){return jsonconvert.serializeobject(obj);}/// <summary>/// 将json字符串转为datatable/// </summary>/// <param name="jsonstr">json字符串</param>/// <returns></returns>public static datatable todatatable(this string jsonstr){return jsonstr == null ? null : jsonconvert.deserializeobject<datatable>(jsonstr);}/// <summary>/// 将ienumerable't'转为对应的datatable/// </summary>/// <typeparam name="t">数据模型</typeparam>/// <param name="ienumberable">数据源</param>/// <returns>datatable</returns>public static datatable todatatable<t>(this ienumerable<t> ienumberable){return ienumberable.tojson().todatatable();}/// <summary>/// 判断是否为null或者空/// </summary>/// <param name="obj">对象</param>/// <returns></returns>public static bool isnullorempty(this object obj){if (obj == null)return true;else{string objstr = obj.tostring();return string.isnullorempty(objstr);}}/// <summary>///将datatable转换为标准的csv字符串/// </summary>/// <param name="dt">数据表</param>/// <returns>返回标准的csv</returns>public static string tocsvstr(this datatable dt){//以半角逗号(即,)作分隔符,列为空也要表达其存在 。//列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来 。//列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来 。stringbuilder sb = new stringbuilder();datacolumn colum;foreach (datarow row in dt.rows){for (int i = 0; i < dt.columns.count; i++){colum = dt.columns[i];if (i != 0) sb.append(",");if (colum.datatype == typeof(string) && row[colum].tostring().contains(",")){sb.append("\"" + row[colum].tostring().replace("\"", "\"\"") + "\"");}else sb.append(row[colum].tostring());}sb.appendline();}return sb.tostring();}}}完整项目:mysqlbulkloaderdemo
运行结果如下:

二、mysqlbulkloader使用过程中出现的问题 上边已经完整了介绍了mysqlbulkloader的使用,但是在使用过程中出现了很多问题,主要集中在两方面,第一个方面是mysql数据库不支持加载本地文件数据;第二个方面是我的数据库在阿里云服务器上,而代码在本地,换句话说数据库和项目是分别放在不同服务器上的 。
1、mysql数据库不支持加载本地文件数据 (1)mysqlbulkloader原理?
我们结合sqlbulkcopy来说,用过sqlserver数据库的都熟悉sqlbulkcopy,很方便,可以直接将datatable中的数据批量导入到数据库 。与sqlbulkcopy不同,mysqlbulkloader也称为load data infile,他要从文件读取数据,所以我们需要将我们的数据集(如上边的list<student>)保存到文件,然后再从文件里面读取 。而对于mysql来说,为了数据库的安全,本地导入文件的配置没有开启,所以使用mysqlbulkloader批量导入数据库,就需要mysql数据库支持本地导入文件 。否则会出现以下错误:

the used command is not allowed with this mysql version

(2)解决方案
mysql数据库开启允许本地导入数据的配置,命令如下:
set global local_infile=1;//1表示开启,0表示关闭查看该配置的状态命令如下:
show variables like '%local%';
在项目里面的数据库连接字符串做设置
数据库连接字符串要加上”allowloadlocalinfile=true“,如下:
const string connectionstring = "server=localhost;port=3306;user=root;password=123456;database=mysql;sslmode = none;allowloadlocalinfile=true";

2、数据库和项目是分别放在不同服务器上 (1)问题描述
数据库和项目是分别放在不同服务器上,会造成以下问题:
system.notsupportedexceptionhresult=0x80131515message=to use mysqlbulkloader.local=true, set allowloadlocalinfile=true in the connection string. see https://fl.vu/mysql-load-data
(2)原因
因为项目中将数据集生成的文件保存在了项目所在的服务器,另一个服务器上的数据库在插入数据操作时,找不到数据集文件,导致的错误
(3)解决方法
方法很简单,因为数据库并不在项目所在的服务器,所以mysqlbulkloader中要设置local = true读取本地文件,进行导入 。具体代码如下:
(4)总结
如果你的项目和数据库在一台服务器上,那么就不会出现该问题 。
-- 展开阅读全文 --

    推荐阅读