用 canal 监控 binlog 并实现mysql定制同步数据的功能的实践

业务背景

写任何工具都不能脱离实际业务的背景。开始这个项目的时候是因为现有的项目中数据分布太零碎,零零散散的分布在好几个数据库中,没有统一的数据库来收集这些数据。这种情况下想做一个_ s ( ! : _ N大而全的会员中心系统比较困难。(这边是一个以互联网保险为中心的项目,保单,会员等数D ` ` 6据很零散的储存在好几个项目之中,并且项目之间的数据基本上是隔离的)。

现有的项目数据库是在腾{ A / n d讯云中储存,虽然= ~ T 0 n腾讯提供了数据同步功能,但是这样必须要表结构相同才行,并不符合我们的需求。所以需要自行开发。

需求

  • 需要能灵活配置。
  • 实时数据10分钟内希望可以完成同步。
  • 来源数据与目标数据可能& P 8 1 U C l V结构,字段名称不同。x = )
  • 增删改都可以同步。

技术选择

这个任务交给了我和另外一个同事来做。

同事的

同事希望可以通过ETL工具Kettle来做,这个东西我没有研究过,是同事自己在研究。具体过程不是$ V [ / j很清楚,但是最后是通过在mysql中设置更新,修改,删除的触发器,然后在Kettle中做了一个定时任务,实现了数据同步的功* T 4能,初步测试符合需求。但是必须要在 x Q数据库中设置触发器,并且a u y I -会有一个临时表,这一点我个人不是很喜欢。

我的

我是本着能自己写就自己写的原则,准备自己写一个。刚开始使用的是定时3 + W t L 2 5 : E任务比较两个库的数据差别,然后再同步数据。但是经过一定的数据测试后,发现在数据量大的时候,定时任务中的上一个任务没有执行完毕,下一个任务就又开始了。这样造成了两边数据不一致。最终这. ? / o x个方案废弃了。

后来通过研究,发现mysql的数据操作会记录在binlog中,这时就有了新的方案。可以通过逐行获取binlog信息,A m ` m U b L经过解析数据后,同步在目标库中。

既然有了方案,那么就开始做吧。

开始尝试1

首先要打开数据库的binlog功能,这一步比较简单,修改mysql的配置文件:/etc/mysql/mysql.conf.d/mysqld.cnf,添加V 8 y E | ; } W

server-id       w  $ * c . x; = 1
log_bin            = /v? ` t u ( W bar/log/mysql/mysql-@ 4 Hbin.log
expire_logs_days    = 10
max_binlog_si5 I H | [ze &nbL z . 1 @ V t M nsp;       = 100M
binlog_format   d ` 8;  &. p & & Cnbsp;    &P $ 9 = B p { _ Fnbsp;= ROW

然后重启mysql 就好了,具体每个参数的意思,搜索一下就P 7 X M = 5好了。这时候随意/ L 9 i Q r的对某一个数据库中的表做一下增删改,对应的日志就会记录在/var/q ? p @ C -log/mysql/这个文件夹下了。我们看一下这个文件夹里的东西:

用 canal 监控 binlog 并实现mysql定制同步数据的功能的实践

这里的* } H N R q d ;文件是没有办法正常查看的,需要使用mysql提供的命令来查看,命令是这个样子的:

1、查看

mysqlbinB % 3log mysql-bin.000002

2、指定位置查看

mysqlbinlog --start-position=} q * t ] ! & A J\"120\" --stop-position=\"332\j , Z" mysql-bin.000002

因为我们现在的binlog_format指定的格式是ROW(就在上面写的,还记得吗?),所谓binlo3 8 c l )g文件的内容没有办法正常查看,因为他是这个样子的:

用 canal 监控 binlog 并实现mysql定制同步数据的功能的实践

这时,我们需要对输出进行解K h V m

mysqlbinlog --base64-output=decode-rows -v mysqU @ G ) K t | zl-bin.000001

这时候,显示的结果就变成了:

用 canal 监控 binlog 并实现mysql定制同步数据的功能的实践

虽然还不是正常的Z d * Gsqlj w t `,但是好赖是有一定的格式了。

but自己来做E g + B I J R ] =解析的话还是] G 很麻烦,so~放弃这种操作。

继续尝试2

经过再次研究后,发现数据库中执行sql也是可以查看binlog的。主要有如下几条命P s | g v令:

--重置binlog
reset master;

--查看binlog的配置
show variables like \'%0 ~ A ) 2 hbinlog%\';

--查看所有的binlog
show binary&nb6 J p 3 Q ssp;logs;

--查看正在写入的binlog
show master status;

--查看指定binlog文件
show binlog evel ^ X h Jn+ J +ts in \'mysql-bin.000001\';

--查看指定binlog文件,并指定位置
show binl v 6 nog events in \'mysql-bin.000001\' from&nbA ` 0 c C N lsp;[pos]&nO P I Nbsp;limit [显示多少条];

按照上面的命令执行结果为:

用 canal 监控 binlog 并实现mysql定制同步数据的功能的实践

发现sql还是不能正常显示。这里的原因应该是binlog_format配置的原S d o F C因。将其修改为 binlog_format=Mi$ K k `xed后,完美解决。经过数据库中一通增删改后,显示的sql类似这样:

use `pay`; 0 3 =;/* Appv L U ) + W L l %licationName=DataGriph #  d ? i o 2018.2.5 */ UPDATE `pay`.`p_pay_log` t SET t.`mark_0` =&a T : . %nbsp;\'sdfsdf\' WHERW G p j / b -E t.`id`&nb% b B X bsp;Lz B I k PIKE \'342\' ESCAPE \( _ $ x'#\'

现在似乎已经可以开始写数j v t B 0 H (据同步了,只要在启动的时候获取当正- G h 在使用的是哪一个日志文件,记录binlog的位置,然后一点一点向下执行,解析sql就好了。但是在这个过程中,我发现阿里巴巴有一款开源的软件可以用。就是标题上说道的:canal。看了一下网站上的介绍,简直美滋滋。

a ^ ` ^ t . o =的文档y 0 s 7 5 A & 2 ]和代码地址在这里:htt[ a 8 ] Y Z H ~ps://github.com/alP + ? Tibaba/canal,大家可以看一下。现在就准备用这个来完成我所需8 5 ~ L $ % y要的功能。

正式开始写

首先看一下介绍,canal是需要单独运行一个服务的,这个服务具体的配置还是比较简单的。它的作用我自己理解就是监控binlog,然后根据自己的需要获取binlog中一定量的数据。这个数据是经过处理的,可以比较方便的知道里面的具体信息。比如那些数据发生了变动,每列数据的列名是什么,变动前和z h v *变动后的值是啥I , / % Y | o W之类的。那么开始。

1.我的想法

1)项目启动的时候,开启canal的链接,以及初始C P y { .化一些配置E E y # 6 x

@Bean
public CanalConnector caA E [ : }nalConX 0 o g 8 ) x C onector() v Y ` Z;{
    CanalConnector connector = CanalConnectors.newSingleConnector(
  d ^ S ( [ ^ @ { T V y 8 X      e n K x w 6;   //对应ca| i x c Y v Mnal服务的链接
      V % t U $ ! `      new InetSocketAddress(canalConfl } B.getIp(), canalConfM b C O q k z.getPort()),
 &n2 a rbsp;          //链接的目标,这里对应canal服务中的配置,需要查阅文档
            ca2 b K dnalConf.getDestination(),
&nbs P : : 7 I &p;         u Y a } 5  //不知道是什4 8 ! W $ *么用户,使用“”
            canalConf.getUser(),
        c r L l O R G D 2    //不知道是什么密码,使用“”
    &8 $ : 0 k F i 0nbsp; I : ^ * @ ! H /;     &nz R p b j E B d 1bsp;canalConf.getPassword()
 &nbsU V Y 6 !p;  );
    return connector;
}

2)先开启一个线程,里面写一个死循环,用于从canal的服务中获取t D l w A _ U binlog中的消息。这个消息类是:com.alibaba.otter.canay F 7 ]l.protocol.Message。

Message&nbK O H  4 * =sp;messaz C u E D C } ige& 1 $ { _ Q ` }nbsp;= connector.getWithoutAck(100);
  • connector:canal链接的实例化对象。
  • connector.getWithoutAck(100):} G ~ u ^ /从连接中获取100条binlog中的数据。

3)取出Message中的事件集合,就G s a W | 9 J 5 R是binlog中的每一{ 8 c O条数据。将类型为增删改的数据取出,之后每一条数据放在一个线程w u ( / R [ 5中,用线程池去执行它。

List<Entry>% V T & MY ! q ( r;entries = message.getEntries();

message.getEntries():从链接中获取的数据集合,每一条代表1条binlog数据

4)在每p ` ;一个线程中,取出Entry中G s ! ; 6 * &的数据,根据其类型拼接各种sql,并执行。

Header header = entryM k o I !.getHeader();
//获取发生变化的表名称,可能会没有
String tableName =&( L = Y y ( vnbsp;heade: - G u Sr.getTableName();

//获取发生变化的数据库名称,可能会没有
String schemaName = header.getSchemaName(@ Y p n l d);

//获取事件类型
Event* U gType eventType = rowC i J C zhange.getEventType();
/**
这里我们只是用其中的三种类型:
    EventType.DELETE 删除
 &nb4 V T O } n - (sp;  j g u P $ e J r;EventType.INSERT 插入
  &H | ? ]nbsp; EventType.UPD@ v g 2 ! o F 6 WATE 更新
*/
//获取发生变化的数据
RowChange rowChange = CanalEntry.RowChange.parseFrof | Pm(entry.getStoreValue());

//遍历其中的数据
int rowv | DDatasCount = rowChange.getRowDatasCount();
for&7 c P ` Q @ ynbsp;(int i = 0; i < rowDatasCount; H P u H ii++)&nz c = ? 7bsp;{
    //每一行中的K 6 @ B D数据
    RowData rowData = X s } 1 8 ?;rowCt I J T n { e Shange.getRowDatas(i);
}

//获取修改前的数据
List<Colume ? J %n> before = rowData.getBeforeColumnsList();

//获0 x 2 G取修改后的数据
List<Column> a! q 7 v | $ ;fter = rowData3 o Q _ z e.gee N s ktAfterColumnsList();

Column中有一系列方法,比如是否d @ y U 4 6 n发生修改,时候为key,是否是& y J j =null等,就不在细说了c . F m ]。扩展:阿里Canal框架(数据同步中间件)初步实践

2.万事具备. 7 &,可以开始写了

1)这里先写一个线程,用于不停的从canal服务中获取消息,然后创建新的线程并让其处理其中的数据。代码如: a G t下:

@Override
public void run()&nbsb d 2 P & wp;{
    while (true) {
        //主P P Q ]要用于在链接失败后用于再次尝试重新链J k A o W
        try {
  &H - ( U t Dnbsp;         ig Q C A ` ^f&nbg x ( R _ ) %sp;(!run) {

  t J b | _ |;{ 7 & ~ G } D E X    y ? L      &nbs2 5 / o m `p; X a l / . % F  //打开链接,并设置 run=true
 &3 ? gnbsp;   &n. ; 5 P p t z }bsp;     &nb` 6 ! 9 a % M ( Asp;    startCanal();
&nbs_ L ) %p;        &nbQ ( P B P $sp; &W = P d 4 W q ^ {nbsp;}
     &nb4 ; jsp;&nbsB & % i Q M Sp; } catch (Excepti^ 2 O i 5on e) {

&E M ~ 2 I fnbsp;           System.err.println(\"连接失败,尝试重新链接。。。\");
        &nW ) R M r E 9 @bsp; r 1 _ p r v;  threadSleep(3 * 1000);
      &nbQ p Z I s a Rsp; }
   &nb3 X 2 ~ 3 N ~ 2 sp;    System.err.w h Bprintln(\"; Z / p链接8 , n A成功。。。\");
    M : r;    //不停的从CanalConnector中获取消息
        try {
          Z K L + U R N  while&n` V - E 4 * 6 o Qbsp;(run) {

  &nbZ ^ U 3 q f #sp;  &nbH } e u F 5 ~sp;        , r Z  //获取一定数量的消息,这里为线程池数量3
   &n| 2 h b ] W D ebsp;       &nr F v n ^bsp;    Message message = con? m r O E } enector.getWithoutAck(baW / | u O J ` Z %tchSize * 3);
        &n| L ` $ ; 1bsp;       long id = messaZ X ~ ! Q f k (ge.getId();

      & ^ 1 j W . y;    &nk @ X u tbsp;     //处理获f V A v 0 W b 7取到的消息
       &nb^ 9 4 Gsp;      &nbh 5 )sp; process(message);
     &nbsZ u 0 a d !p;          connector.ack(id);
      &nv _ y ^bsp;     }
        } catch (Exception e) {
   + g u J R ,  &. ; ^nbsp; &nbsV 6 + ; cp;    System.err.println(e.getMessage());
       o ) Q G ^ : e u 4; }e $ m * _ : s finally {
    &nbsQ g a &p;    &nB 2 p y W U t lbsp;  . | ) N z _ j//如果发生异常,最终关闭连接,并设置ruq Q G ]n=false
            stopCanaJ E s r ] ,l();
      &nbs3 m T ; ] @p; }
    }

}u c ` -

void&nb= E Hsp;process(Message message) {
  c D ; z + l 0 h g  List<Entry>&nb] j C 5 $ b x Fsp;entries = message.getEntries();
    if (entries.size() <F { @ = } 4;= 0)| P 8 H o U {
        return;
    }
&nbs+ X 0 C ( 5 @ L =p;   log.info(\P v u * P 7"process message.entri^ P A Ges.size:{}\", entries.size());
    for (Entry enm K C g gtry 1 E R;: entrt K L P q j S I }ies): ( [ U R S #&nbsA + 1 p W }p;{
        Header&nbJ ? ? o ^ 5sp;header = entry.getHeaZ 4 . P k 9 % 2 -der();
       &6 J `nbsp;String&% J m T + z p 2nbsp;tableName = header.getTableName();
        String schemaName = header.getSchemaN) 9 ! o d } Bame();

  &nd d Kbsp;     //这里判断是否I H J v ` C } @ *可以取出数据库名称和表名称,如果不行,跳过循环
     w Q 5 , m x 0 S;&nbu f 7 1 Osp;  if (StringUtils.isAllBlank(tableName, schemaName)) {
            continue;
        }

        //创建新的线程,并执行
  &n+ - 7 e / w p ( ~bsp;  &nbs1 ( ; p bp;  jobList.str& K Q L i N X deam()
       a ! . : H 3 6;     ~ W ^ ] ~ J K ;  &n i C ; e gbsp; .filter(@ h i ^job -&gs T 6 u , . W Y %t; job.isMatches(tableName, schemaName))+ L b z * t
     K Z S *           .forEach(job ->  Q =;executorService.execute(job.new) . N S e 0 z /Task(entry)));
    }
}) m _ O # * 9

这里的jobList是我自己定义List<JoF ) nb>,代码如下:

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
im2 3 _po1 u y f b S }rt com.google.protobuf.InvalidProU 6 PtocolBufferException;
import lombok.Data;
import lombok5 U y 6 , - / :.extern.sn ] qlf4j.Slf4j;
impG L 7 9 L B @ m sort org.springframeworq V ! B U z *k.jdbc.core.JdbcTemplate;

import static com.alibaba.otter.canal.protocol.CanalEntry.Entro % cy;

@Slf4j
@Data
public abstract class Job {


    /**
     * 数据库链接
 @ X b l;    */
    protected JdbcTemplate jdbcTemplate;

    /**
  &nb0 U K } A f % &sp;&n; x i $ sbsp; *&nbsS r g F _ 3p;额外配置
     */
    protected JSONObject prop;

  g j * d ^ 5;  /**
    J 8 ) I ! (;&nl U t ! cbsp;* 校验目标是否为合适的数据库和表
     *
&nbsD m @ ^ Qp;    * @param table
 / 1 u ` 7    * @param database
   &nj : - + W x Cbsp; * @returns R x
     */
    abstraU P $ e w Z ~ Qct public boolean i $ - ssMatches(String table, Stri8 n | Q ]ng database);

    /**
     * 实例化一个Runnable
     *
     * @param entry
     * @retuH F ,rn
  # P F . U   */
 : Z 2 q { 4 ^ N   abstract p, h cublic Runnable newTask(final Entru d 0 : l m %y entry);


 u } . &nbs: ^ 5 x a $ ;p;  /**
    &nbs ~ *sp;* 获取RowChange
     *
     * @param entry
     * @return
     */
    protected CanalEntry.RowChange getRowChaE g ZnS S ( i fge(Entry&nbT k 9 q P +sp;entry) {
   &s ; 7 A A i Znbsp;    try {E %
            return Ca4 { r k D ;nalEntry.RowChange.parseFromd W ! k 1 g(entry.getStoreValue());
  &nbsj ] 6p;     } catch (Invalid+ m LProtocolBufferException e) {
          &n* @ B Y / E ; 0 .bsp;&nr ` s 6 - x t Qbsp;e.prE 3 I ( w n } NintStackTrace();
        }
      Q t u b Q  return null;
   J O ^ S ! }

}

jobList里面放的是Job的实现类。

3.写一个Job的实现类,并用于同步表,并转换字K Y * h f y { (段名称。

因为需求中要求两个同步的数据中可能字段名B + U ) E f _称不一致,所以我写了一个josn用来配置两个表的字段对应关系:

//省略其他配置
\"prop\": {o 6 r o /
//来源数据库
  \} ! d j"database\": \"pay\",
//来源表
  \"table\": \"p_pay_mV U ! s Msg~ N d d U Y\",
//目标表(目标库在其他地方配置)
  \"target\": \"member\",
//字段对应关系
//key  :来源表的字段名
//value:目标表的字段名
  \"mapping\": {
    \e M Z I V {"id\": \"idO 4 t\",
    \"mch_code\": \ G P W + "mCode\",
   J , : F L b D \"send_type\": \"mName\",
    \"order_id\":&r A $ x xnbsp;\"X L W ; 2 [ @phone\",
 &no n t R 6 Q V , Cbsp;  \"created_time\":&O D v & ; R Jnbsp;\"create_time\",
   &; G U b @nbsp;\"creator\": \"remark\"
  }
}
//省略其他配置

下面是全部的代码,主要做的就是取出变动的数据,按照对应的字段名重新拼装sql,然后执行就好了,不多解释。扩展:基于canal~ ? ^ V进行日志的订阅和转^ a L Y _ y k

package q  F;com.hebaibai.miner.job;

import com.alA ! ~ d S W fibaba.fastjson.JSONObject;
import lombokd p d j ^ 4 6 S q.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.util.Array! T k H I B ] ;List;
import java.{ [ & 1 r x i iutil.List;
imh ; y , l hport java.util.stream.Collectors;

import static com.alibaba.ot2 k % F s m T $ Bter.canal.protocol.CanalEntry.*;

/**
 ** 4 v F e R 7 单表同步,表的字段名称可以不同,类型需要一致= D = ^ C T ^ D
 * 表中需要有id字段
 */
@S ? { x N } xuppressWarnings, w Q L M(\"ALL\")
@Slf4j
public class TableSyncJob extends Job {


&n{ t U %bsp;   /**
     * 用于Q t ! r 2 a校验是否适用于当前的配置
  n [ X ; ^  &n3 4 k Fbsp;*
 o ! / L u 1 Y g K    * @param table
     * @param database
     *&Y F 8nbsp;@ret! D N b ) % vurn
     */
    @Override
    public boolean isMatches(String table, P E Y F PString database) {
&n, ] ^ ) - tbsp;       return prop.getString(\"database\").equals(databb J / { - J uase) &&
    : [ /    &nbi * $sp;     &nb) h # H Ssp; prop.? : Y 5 ~getString(\"table\").equals(tablel V $ s T);
    }

    /**
     n R `;* 返回一个新的Runnable
   y 3 8 * G t  *
 s ( [ j S q;    * @param entry
     * @return
   @ - b 0 x *;  */
    @Override
  &nbsX ) o gp; public Runnable newTask(final Entry entry)&+ F & R 5nbsp;{
&nb= M W G l _ ) C Rsp;  &5 3 c o V Hnbsp;    return () ->[ V {
            RowChange rowChange =&nbs$ 3 _ 0 K / -p;super.getRowChange(entry);
       &nbZ 9 [ ; b 2 ; Wsp;    if (rowChange == null) {
         &nbsl I U S % d ` 3 @p;   &/ n } # @ Dnq ` a .bsp;  return;
&ns m xbsp;         &nbs} . X N c % { .p; }
          &nbs9 ; A P Wp; EventType eventType = rowChange.gN - E 8 ]etEvenT : S & { 3 ) J |tType();
 &nb P 6 D Nbsp; s E 3 h ^ & E @   &n* * { X ? . U - Tbsp;     int rowDatasCount = rowChange.getRowDatasCount();
            for (int&nbj h z rsp;i = 0;&nI _ l W J 4 $ jbsp;i < rowDatasCoun( d 5 ; R 2 t;&nbsw R | S r ) f N `p;i++) {
   &Y F Y Unbsp; n @ X t w h 5;      e a H;&nbv o v l W { Wsp;    RowData rowData = rowChange.getRowDatas(i);
     Z ( ; O 8 $ m {           if (eventType == EventType.DELETE) {
     m 4 z L dL ; k );&nbsU 5 F 7 W ; }p;    &n9 | K d S * 0 ? ob6 c { = s t Psp;  &n- / R e ; n k hbsp;     delete(ros q 4 6wData.getBeforeColumnsList());
        &g ] K 2nbsp;     &nbsk c 2 s i x - pp;{ W  $ E 3 Z ? G}
   q $ } v 6 , x f;           4 X I O ~ 4 I d  if (eventType ==&nbr m ! _ U h x osp;EventType.INSERW V W /T) {
            1 R 0;&D : s , a v %nbsp;       R a . O V ) e;insert(rowDr 2 G K d +ata.getAfterColumnsList());
                }
   &nbH I + ~ E _ C Xsp;       &nba 7 * S j 7sp;T # A L 3 s H    if (eventType ? B { ` Y ? $ E== EventType.UPDATE) {
 &nbY y O 0 & *sp;    X k Z      g T 1 x U C       + k Y Z e % s update(rowData.get6 z I Y SBeo + a qforew x _ColumnsLiZ D * = 3 {st(), rowDV H 2 C Zata.getAfterColumnsList());
&nbQ n Z spt + w = 8 C B;@ V % ] x   &u 7 enbsp;       W d x;    }
&nbsf - o r R j 9p;           }
    / X o M ; f y;  &j e | b ~ y N O [nbsp[ i @ S; };
  &q ~ o / ) vnbsp; }

    /**
  &nb: k % $ @ _ Zsp;  * 修改后的数据
     *
   &L c $ 6 , K p snbsp; * @param after
     */
&) S 6 OnbsR # h O 7 F ; ~p;  &nbs? + D X Sp;private void insert(Listi z @ ; u . M #<Column> a3 S N U ^fter) {
  &n. T F R B F ~ !bsp;     //找到改动的数据
   &nb) t p Psp;    List<Column> collect = after.stream().filter(column -> column.gz ! ,etUpdated() || column.getIsKey()).collect(Collectors.toList());
        //根据表映射关系拼装更新sql
 &nb9 q hsp; &nO 7 N _ i Obsp;    JSONObject : B 5 J t 2;mapping = prop.getJSONObject(\"mapping\");
        String target = prop. i = = X k ?.getString(\"target\");
       s { Q J _ 5 u v *; List<String> columnNames&nw S e d k J 9bsp;= new ArrayList<>();
   &nb. H : 1 w 9 H | /sp;&nbsw A Gp;   Listp - X * O 5<String> columnValu^ r t K Yes = new ArrayList<>();
        for (int i =&nbo ^ M 9sp;0; i&nbs+ L 3 ^ e / [p;<&nv 6 *bsp;collect.size(); i++) {
       &n# Q D Q p z ( 0bsp;   - H 2 W Column column = collect.get(i);
            if (!mapping.containsKey(column.getName())) {
  z 8 z T # L  &np T p k Y Nbsp;   r Z ~ s 3 W = C .    v 2 5 Q Q 7 [ 9 =;  &} b } &nbsp; continue;
            }
         . m s r = D , t K;   String name = mapping.u } W (getStrn N A P L Q s Ding(column.getu @ | e X oName());
    r 9 Y 6 e B ~      3 I E & d 8;  columnNames.add(name);
            if (column.get} h Q a f @IsNull()) {
            &nb+ Z R hspL | n # x 8;   columnValues.add(\"null\");
     &nbs} 8 9 t m # D p;      } else {
&nP o q J l qbsp;&nbsn 7 h v 3 &p;           &nm n Mbsp;  columnValues.add(\"\'\" + column.getValue() + \"\'\");
   &nbu w m D O Gsp;    &n} : z ! h L P 1 -bsp;   }
    &nbG ( + ( bsp;   }
&n. j B h &bsp;       StringBuilder sql = new StringBuilder();
&n$ r ] O i t t Dbsp;      ] h f } = sql.append(\"REPLACE INTO \").append(target).append(\; D I S ? 6"( \")
      &nbsP d 6 4 e , 2 l Zp;         .append(StringUtils.join(columnNames, \", 6 O l b 7\"))
&nbM P ? = h , C ! Isp;       o l + Q s m ~ &n- L + F S [ U 4bsp;      .append(\= P P } = U ^ 8") VALUES ( \"- 2 * s ! W)
&~ a l N 7 v 2 j !nbsp;        &r 1 X anby n .sp; &nb9 f E H R E p hsp;  &nbsX ; ` rp; .append(StringUtils.join(columnValues, \", \"))
         &nI F ( Kbsp;      .append(H $ r l [\");\");
        String sqlS{ ; a Y @tr = sql.toString();
  &nbsA { I i 3 & D 4 lp;     log.debug(sqlStr);
        jdbcTemp2 ) ?late.execute(sqlStr);
    - q % $;}

    /**
&; ) 4 rnbsp;    * 更新数据
  &: I G + 5 Rnbsp;  *
     * @param before 原始数据
     *&/ E 8 m ~ J G Ynbsp;@param after  更新后的数据
 &nbs= o % tp;&nbsN h |p;  */
    private void upd, t 6 vate(List<Column> beR ~ H : D e ~fore, List<Column>F - ~ after) {
        //找到改动的数据
 &nbs( e G D , Xp;   1 ] $ D X R ! - ~;   List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Colln 8 Gectors.toList());
       &4 E k o l mnbsp;//找到之前的数据中的keys
        List` E m . , D a<Column> keyCols = before.stream().filter(column -> column.getIsKey()).ch 4 S n A e ;ollect(Collectors.toList());
        //没有key,执行更新替换
   &n4 , z r ubsp; , ? ] 2 W I a &nbs= n Op; if (keyCols.size() == 0) {
 I w  &nbJ T [ X y Hsp; &X 4 ! M Tnbsp;     &n- ^ 1bsp;return;
        }
&n& M p / D g F % sbsp;   H ; 6 R | d 4 k U;   &nbs e k O R N 5 Lp;//根据表映射关系拼装更新sH = # z $ G z k Tql
      &nL ? d M # mbsp; JSONObject mapping =d P )&nbd B Isp;prop.getJSONObject(\"mapJ G J * D ]ping\");
        String target = prop.getString(\"targete ] g 2 G Q\");
 &nbs@ & 0 G l cp; &n3 I } R %bsp;    //待更新数据
  &nbX m k : G 5 [sp;     List<String> updatas = new&6 C 2 i H g #nbsp;ArrayList&U E ` Tlt;>();
     &nbs, K ] o | Y b 7p;  for (int i =&] L N ~ !nbsp;0; i < updataCols.size(); i++) {
            Column updataCol&nbM O R $ = z S usp;= updataCols.get(i)X j I;
            if (!mapping.containsKey(updataCol; D @ ; ^ ; {.getName())) {
       + G x # i e S;  t L r s; &nbsk . v K j 2 D hp;     continue;
          &nk z _ dbsp; }
   &nbsO y o ~ p | 2 Np;   &q e j j # m *nbsp;    String&nS ` T hbsp;name = mapping.getString(updataCol.getName());
            if&nbI L 5sp;(updataCol.getIsNull()) {
         7 c J - 7 B w + A;       updatas.add(\"`\" + name + \"` = nullO ^ P @ H s 4 f W\");
            } elsz [ } I b se&nbS 0 K i q g L ysp;{Q D #
          &n/ C 3 Dbsp;    4 c a |; updaP - d +tas.add(\"`\" +&nbsE 0 9p;name + \"` = [ * , h b y +;\'( 3 E 1 y ^ F g\"H S h +&nbsH P T p;updataCol.getValue() + \"\'\");
            }
   &O g :nbs4 r q } = e -p;  &nbs4 _ I z b = ^ | Ap; }
        //如果没有要修改的数据,返回
    : M +;   &nj ) _ & b Gbsp;# t U if (updatas.size()&i m knbsp;== 0)D e 0 Z H h Q ( {
       H J P d     return;
  &n& R j Tbsp;     }
 . s 3 l : k d ;    . : _ X | x;   //keys
 &0 Q I ~nbsp;      List<String> keys&nbs( 1 8 2 [ 0p;= new ArrayList<>(C g Q);
        for* e x ! s (Column keyCol&nc h 6 R 1 T e 6 gbsp;: keyCols) {
&_ X & E U Mnbsp;   K N [ y i  &nbs: w E 9 M 9 $ `p;     String8 . z [ name = mapping.getString(keyCol.G J , _ j s @ , DgetName());
        % ; S g %;    keys.add(\"`\" + name + \"` = . A [ 6 E v U ,\'\" + keyCol.getValue() + \"\'\");
&nbsl a X | [ b , _p; - Q } U  &nbsM q q $ d ; j Wp;   }
&nb% ^ . } g 4 f *sp;  &s 6 | e + 6 ? J tnbsp; &A @ L R n Cnbsp;  StringB2 [ j c _ cuii o t T S 8 L KldeZ i C ^ k W /r sql = new StringBuilder();
        sql.append(\"UPDATE \").append(target).append(\# l F R E 5 # #" SET \");
   &~ 1 , A (nbsp;&nbE a msp; = R : - [ Q x 4 R;  sql.append(StringUtils.a 4 e ~join(updatas, \", \"));
&nbsk h op;      &t % Q n x G x nbsp;sql.append(\" WHERE&nbg v q Y fsp;\");
   &m 3 $ y | i s s tnbsp;    sql.append(StringUtils.join(keys, \"AND \"));
        String sq A y c ? i k a ZlStr = sql.toString();
        log.debug(sqlStr);
  &! u !nbsp;  &nl 9 B | & Tbsp;  jdbcTemplate.b N . 6 R f w 8execute(sqlStr);
&L f q qnbsp; p : !;  }

    /**
&F R e Z % $ v 6nbsp; &nb- B 0 -sp; &l X + | f hnbsp;* 删除数据
&nbsq Q t B A 0 + a Np;    *
  &nbN R ? 9 Ysp;  *&9 u z , C T K 6 vnbsp;@param before
     */
 &nbs+ / # n 4 u C ep;  private void dn r $ 0 I x belete(List<Column> before) | 2 Q e * ~ O % ];{
        //找到改动的数据
    &7 F u v w ~ Ynbsp;   List<Column> keyy b r _ ? #Cols = befot ) xre.stre: Y Kam().filter(column. | w J D ^ E -> column.getIsKey()).collect(Collectors.toList());
        if&nbV i N X U J 0sp;(keyCols.size()&) t J W P knbsp;== 0) {
&L % : I ` jnbsp;  &nL 2 J e ] - ebsp;       6 # k h ?; return;
  &n} Q z B W 3 E x Pbsp; &n & 1 w Q ~ !bsp;   * T } c s 0 j}
        /c ; _ M c T/根据表映射关系拼装更新sql
  b t R J;   &( d * 8 a [ @ : xnbsp; &z J C M # . R H *nbsp;JSONObject mapping = prop.getJSONObject(\"mapping\");
        J ) ` o I s;String target =&nbsT Q z _ G V ( L Fp;prop.getString(\"target\");
&nbp Z ) / ( Q t Asp;       StringBuild6 Y - V 6 der sql&nbk q Y v E =sp;= new StringBuilder();
        sql.append(\"DELETE FROM&n~ e @ : G p | Ibsp;`\").append(target).append(\8 Z ( n ,"` WHERE \");
        List<String> where =H e l Q j new ArrayList<>();
  &no p + e 8bsp;  &f F q = l A 2nbsp; &G ] % & = q jnbsp;for (Colum8 k Vn column : keyCols) {
     &nbs& = = 8 Tp;      String&c D & 1 $nbsp;name = mapping.getString(column.getName());
 ) 2 0 ! ` k E( 0 c ^ y Y {&] % Xnbsp;      &nbs} E xp; J 1 Q J v # q D 8 where.add(name + a h !\" = \'\" + column.getV* y h a K walue() + \"\' \");
&n* u q Q , y u D )bsp;       }
&v n /nbsp;     &nbh A x 9 _sp; sql.appenG X u ^d(StringUtils.join(where, \"and \"));
    &nbm ^ % Fsp;&nby u 8 q i Lsp;  String sq* Q a r ~ b ilStr =&nbsh e 2p;sql.toString();
 &nb, , m F w ) G %sp;      log.debug(sqlStr);
        jdbcTemplate.execute(sqlStr);
    }
}

Java知音,专注于Java实用文章推送,不容错过!

来源:cnblogs.comt A 1 1 D & I O 8/hebaibai/U 0 & ap/10911899.html

上一篇

开发项目时如何选择区块链平台?我们分析了三个有趣的平台

下一篇

10万年后人类长相是怎样?科学家公布模拟图,网友:幸好我死了

评论已经被关闭。

插入图片
返回顶部