来源
课件地址:https://manongbiji.oss-cn-beijing.aliyuncs.com/ittailkshow/it300/download/ppt_all_in_one.zip
【132】写了份傻瓜文档,20 分钟上手阿里 Canal 数据同步中间件
来源:https://www.bilibili.com/video/BV1jY41177Qh/?spm_id_from=333.788
文档地址:
链接:https://pan.baidu.com/s/13aYcOmk6yUhbILwKHSXQjA
提取码:jm9y
概述
本文演示如何 Canal 如何接入 MySQL 实现数据写操作监听。 关于 Canal 如何接入消息队列、Redis、Canal 的高可用 HA 后文继续讲解。
开发环境
名称 | 版本 |
---|---|
操作系统 | Windows 10 X64 |
VMware® Workstation 12 Pro | 12.0.0 build-2985596 |
CentOS7 | CentOS Linux release 7.8.2003 (Core) |
Linux | 3.10.0-1127.el7.x86_64 |
mysql | 8.0.28,安装在机器 192.168.0.191 |
Canal | 1.1.5,安装在机器 192.168.0.192 |
安装 MySQL 8.0.28(192.168.0.191)
firewall-cmd --zone=public --add-port=3306/tcp --permanent
firewall-cmd --reload
cd /home/
wget --no-check-certificate https://manongbiji.oss-cn-beijing.aliyuncs.com/ittailkshow/canal/download/world.sql
wget --no-check-certificate https://repo.mysql.com/mysql80-community-release-el7-5.noarch.rpm
yum localinstall -y mysql80-community-release-el7-5.noarch.rpm
#自动安装 MySQL 8.0.28
yum install -y mysql-community-server
调整配置文件
sudo cat >> /etc/my.cnf <<-'EOF'
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog_do_db=world
EOF
#启动 mysql
systemctl start mysqld
获取初始密码
grep 'temporary password' /var/log/mysqld.log
2022-04-09T04:55:23.758813Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: Ke1Lo*SvUQg#
设置相关账户密码
#登录 mysqld
mysql -uroot -p
#修改 root 密码
alter user 'root'@'localhost' identified with mysql_native_password by 'asAS123456!';
#mysql 降低密码强度
set global validate_password.policy=0;
set global validate_password.length=4;
#创建 canal 同步账户
create user canal@'%' identified with mysql_native_password by 'canal';
#授权 canal 用户允许远程到 mysql 实现主从复制
grant select,replication slave,replication client on *.* to 'canal'@'%';
create user remote@'%' identified with mysql_native_password by 'remote';
grant all privileges on *.* to remote@'%';
#初始化数据库
source /home/world.sql
安装 Canal-Server(192.168.0.192)
安装 JDK
yum ‐y install java‐1.8.0‐openjdk‐devel.x86_64
sudo cat >> /etc/profile <<-'EOF'
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/lib:$PATH
EOF
source /etc/profile
echo $JAVA_HOME
下载 canal-deployer 最新版
# 下载脚本
wget --no-check-certificate https://manongbiji.oss-cn-beijing.aliyuncs.com/ittailkshow/canal/download/canal.deployer-1.1.5.tar.gz
mkdir /home/canal
tar zxvf canal.deployer-1.1.5.tar.gz -C /home/canal
修改配置文件
vim /home/canal/conf/example/instance.properties
修改下面内容
# 调整 serverId
canal.instance.mysql.slaveId=10
#master 地址
canal.instance.master.address=192.168.0.191:3306
#关闭 tsdb
canal.instance.tsdb.enable=false
#确认 canal 同步用的用户名,密码
canal.instance.tsdb.dbUsername=canal
canal.instance.tsdb.dbPassword=canal
启动服务
cd /home/canal/
sh bin/startup.sh
添加端口
#canal admin 端口
firewall-cmd --zone=public --add-port=11110/tcp --permanent
#canal 监听端口
firewall-cmd --zone=public --add-port=11111/tcp --permanent
#canal 指标监控端口
firewall-cmd --zone=public --add-port=11112/tcp --permanent
firewall-cmd --reload
查看是否启动成功
cd /home/canal/logs/canal
tail canal.log
结果
2022-04-09 13:00:25.702 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2022-04-09 13:00:26.024 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2022-04-09 13:00:26.073 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2022-04-09 13:00:26.645 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.122.1(192.168.122.1):11111]
2022-04-09 13:00:35.164 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
开发数据监听程序-canal-client
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itlaoqi</groupId>
<artifactId>canal-client</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>aliyun</id>
<name>aliyun</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>aliyun</id>
<name>aliyun</name>
<url>https://maven.aliyun.com/repository/public</url>
</pluginRepository>
</pluginRepositories>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
</dependencies>
</project>
编写数据监听程序
package com.itlaoqi;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class AD {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//TODO 获取连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.0.191", 11111), "example", "", "");
while (true) {
//TODO 连接
canalConnector.connect();
//TODO 订阅数据库
canalConnector.subscribe("world.*");
//TODO 获取数据
Message message = canalConnector.get(100);
//TODO 获取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
//TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据
if (entries.size() <= 0) {
System.out.println("当次抓取没有数据,休息一会。。。。。。");
Thread.sleep(1000);
} else {
//TODO 遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
//1.获取表名
String tableName = entry.getHeader().getTableName();
//2.获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//4.判断当前entryType类型是否为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
//5.反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//6.获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//7.获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
//8.遍历rowDataList,并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
//数据打印
System.out.println("Table:" + tableName +
",EventType:" + eventType +
",Before:" + beforeData +
",After:" + afterData);
}
} else {
System.out.println("当前操作类型为:" + entryType);
}
}
}
}
}
}
测试
● 启动 canal-client
● 连接数据库 192.168.0.191
新增数据
mysql> insert into city(ID,Name,CountryCode,District,Population) values(99999,'luoma','AFG','luoma',99999);
Query OK, 1 row affected (0.01 sec)
canal-client 控制台输出
当次抓取没有数据,休息一会。。。。。。
当前操作类型为:TRANSACTIONBEGIN
Table:city,EventType:INSERT,Before:{},After:{"Population":"99999","ID":"99999","CountryCode":"AFG","District":"luoma","Name":"luoma"}
当前操作类型为:TRANSACTIONEND
当次抓取没有数据,休息一会。。。。。。
修改数据
mysql> update city set name='luoma888888' where id = 99999;
Query OK, 1 row affected (0.10 sec)
Rows matched: 1 Changed: 1 Warnings: 0
canal-client 控制台输出
当前操作类型为:TRANSACTIONBEGIN
Table:city,EventType:UPDATE,Before:{"Population":"99999","ID":"99999","CountryCode":"AFG","District":"luoma","Name":"luoma"},After:{"Population":"99999","ID":"99999","CountryCode":"AFG","District":"luoma","Name":"luoma888888"}
当前操作类型为:TRANSACTIONEND
删除数据
mysql> delete from city where id = 99999;
Query OK, 1 row affected (0.01 sec)
canal-client 控制台输出
当次抓取没有数据,休息一会。。。。。。
当前操作类型为:TRANSACTIONBEGIN
Table:city,EventType:DELETE,Before:{"Population":"99999","ID":"99999","CountryCode":"AFG","District":"luoma","Name":"luoma1"},After:{}
当前操作类型为:TRANSACTIONEND
【142】简单粗暴,20 分钟 ShardingJDBC 5 实现 MySQL 分库分表
来源:https://www.bilibili.com/video/BV12A4y197sa?spm_id_from=333.999.0.0
文档地址:
链接:https://pan.baidu.com/s/1Y42yVdVcI90kI0eN70GeLA
提取码:rje6
开发环境
名称 | 版本 |
---|---|
操作系统 | Windows 10 X64 |
VMware® Workstation 12 Pro | 12.0.0 build-2985596 |
CentOS7 | CentOS Linux release 7.8.2003 (Core) |
Linux | 3.10.0-1127.el7.x86_64 |
mysql【第一个数据源】 | 8.0.11,安装在本机 Windows 机器 |
mysql【第二个数据源】 | 5.7.16,安装在 192.168.16.128,CentOS 机器 |
ShardSphere
https://shardingsphere.apache.org/index_zh.html
https://shardingsphere.apache.org/document/current/cn/overview/
Database Plus
什么是 Apache ShardingSphere?
Apache ShardingSphere 是一套开源的分布式数据库增强计算引擎,其通过可插拔架构构建基于数据库之上的生态系统,实现包括数据分片、弹性伸缩、加密脱敏等功能为代表的增强能力。
ShardingJDBC
定位为轻量级 Java 框架,在 Java 的 JDBC 层提供的额外服务。 它使用客户端直连数据库,以 jar 包形式提供服务,无需额外部署和依赖,可理解为增强版的 JDBC 驱动,完全兼容 JDBC 和各种 ORM 框架。
● 适用于任何基于 JDBC 的 ORM 框架,如:JPA, Hibernate, Mybatis, Spring JDBC Template 或直接使用 JDBC;
● 支持任何第三方的数据库连接池,如:DBCP, C3P0, BoneCP, HikariCP 等;
● 支持任意实现 JDBC 规范的数据库,目前支持 MySQL,PostgreSQL,Oracle,SQLServer 以及任何可使用 JDBC 访问的数
据库。
ShardingSphere-Proxy
定位为透明化的数据库代理端,提供封装了数据库二进制协议的服务端版本,用于完成对异构语言的支持。
目前提供 MySQL 和 PostgreSQL(兼容 openGauss 等基于 PostgreSQL 的数据库)版本,它可以使用任何兼容 MySQL/PostgreSQL 协议的访问客户端(如:MySQL Command Client, MySQL Workbench, Navicat 等)操作数据,对 DBA 更加友好。
● 向应用程序完全透明,可直接当做 MySQL/PostgreSQL 使用;
● 适用于任何兼容 MySQL/PostgreSQL 协议的的客户端。
ShardingSphere-Sidecar
定位为 Kubernetes 的云原生数据库代理,以 Sidecar 的形式代理所有对数据库的访问。 通过无中心、零侵入的方案提供与数据库交互的啮合层,即 Database Mesh,又可称数据库网格。
Database Mesh 的关注重点在于如何将分布式的数据访问应用与数据库有机串联起来,它更加关注的是交互,是将杂乱无章的应用与数
据库之间的交互进行有效地梳理。 使用 Database Mesh,访问数据库的应用和数据库终将形成一个巨大的网格体系,应用和数据库只需
在网格体系中对号入座即可,它们都是被啮合层所治理的对象。
对比选型
ShardingSphere-JDBC | ShardingSphere-Proxy | ShardingSphere-Sidecar | |
---|---|---|---|
数据库 | 任意 | MySQL/PostgreSQL | MySQL/PostgreSQL |
连接消耗数 | 高 | 低 | 高 |
异构语言 | 仅 Java | 任意 | 任意 |
性能 | 损耗低 | 损耗略高 | 损耗低 |
无中心化 | 是 | 否 | 是 |
静态入口 | 无 | 有 | 无 |
新建数据库和表
mysql【第一个数据源】
版本 8.0.11,安装在本机 Windows 机器
create database if not exists testdb default character set utf8 collate utf8_general_ci;
use testdb;
create table employee
(
id bigint(20) primary key,
`name` varchar(100)
);
mysql【第二个数据源】
版本 5.7.16,安装在 192.168.16.128,CentOS 机器
create database if not exists testdb default character set utf8 collate utf8_general_ci;
use testdb;
create table employee
(
id bigint(20) primary key,
`name` varchar(100)
);
ShardingJDBC 分库分表快速上手
项目结构
pom.xml
第一步,新建Spring Boot2.x工程,引入 shardingsphere-jdbc 核心 starter
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3</version>
</dependency>
完整内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>study-2022</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>shardingjdbc-demo</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-jdbc-core-spring-boot-starter</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
application.properties
第二步,配置多数据源与分片策略
# 配置真实数据源,ds{0..1}
spring.shardingsphere.datasource.names=ds0,ds1
# 配置第一个数据源
spring.shardingsphere.datasource.ds0.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.jdbc-url=jdbc:mysql://localhost:3306/testdb
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=0000abc!
# 配置第二个数据源
spring.shardingsphere.datasource.ds1.type=com.zaxxer.hikari.HikariDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.jdbc-url=jdbc:mysql://192.168.16.128:3306/testdb
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=0000abc!
# 定义数据源的分片规则,按 employee 表的 id % 2 取模得到数据应该放在哪个数据源
spring.shardingsphere.rules.sharding.sharding-algorithms.database-inline.type=INLINE
spring.shardingsphere.rules.sharding.sharding-algorithms.database-inline.props.algorithm-expression=ds$->{id % 2}
# 定义哪一个列用于生成主键
spring.shardingsphere.rules.sharding.tables.employee.key-generate-strategy.column=id
# 定义 employee 表哪个是分片字段,这里按主键字段 Id
spring.shardingsphere.rules.sharding.tables.employee.database-strategy.standard.sharding-column=id
# 将 employee 表与分片规则 database-inline 绑定
spring.shardingsphere.rules.sharding.tables.employee.database-strategy.standard.sharding-algorithm-name=database-inline
# 默认主键生成策略采用 snowflake
spring.shardingsphere.sharding.default-key-generate-strategy.xxx=snowflake
# snowflake 算法配置
spring.shardingsphere.rules.sharding.key-generators.snowflake.type=SNOWFLAKE
# 机器唯一标识
spring.shardingsphere.rules.sharding.key-generators.snowflake.props.worker-id=666
# 显示分库分表后执行的 SQL 语句
spring.shardingsphere.props.sql-show=true
主启动类-ShardingjdbcApplication
package com.itlaoqi.shardingjdbc;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@MapperScan
@SpringBootApplication
public class ShardingjdbcApplication {
public static void main(String[] args) {
SpringApplication.run(ShardingjdbcApplication.class,args);
}
}
数据库相关类
package com.itlaoqi.shardingjdbc.entity;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
@TableName("employee")
public class Employee {
@TableId
private Long id;
private String name;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.itlaoqi.shardingjdbc.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.itlaoqi.shardingjdbc.entity.Employee;
public interface EmployeeMapper extends BaseMapper<Employee> {
}
测试类和方法-EmployeeTestor
利用 MyBatis 或者 Hibernate 等 ORM 框架进行数据操作
package com.itlaoqi.shardingjdbc;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.itlaoqi.shardingjdbc.entity.Employee;
import com.itlaoqi.shardingjdbc.mapper.EmployeeMapper;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
import java.util.List;
@SpringBootTest
public class EmployeeTestor {
@Resource
private EmployeeMapper employeeMapper;
@Test
public void testInsert(){
for(int i = 0 ; i < 10 ; i++) {
Employee employee = new Employee();
employee.setName("MJ" + i);
employeeMapper.insert(employee);
}
}
@Test
public void testSelect(){
List<Employee> employees = employeeMapper.selectList(new QueryWrapper<>());
}
}
测试
新增数据-testInsert
运行 testInsert
方法,查看控制台日志
部分日志如下,可以看到使用 id 取模 2 在 ds0,ds1 两个节点加入数据
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101119097311234, MJ0]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds1 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101126974214145, MJ1]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101126974214146, MJ2]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds1 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101127502696449, MJ3]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101127825657858, MJ4]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds1 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101128157007873, MJ5]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101128157007874, MJ6]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds1 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101128475774977, MJ7]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101128475774978, MJ8]
ShardingSphere-SQL: Logic SQL: INSERT INTO employee ( id,name ) VALUES ( ?,? )
ShardingSphere-SQL: SQLStatement: MySQLInsertStatement(setAssignment=Optional.empty, onDuplicateKeyColumns=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: INSERT INTO employee ( id,name ) VALUES (?, ?) ::: [1514101128815513602, MJ9]
【mysql【第一个数据源】 | 8.0.11,安装在本机 Windows 机器 】
select * from employee
【mysql【第二个数据源】 | 5.7.16,安装在 192.168.16.128,CentOS 机器】
select * from employee
查询数据-testSelect
运行 testSelect
方法,查看控制台日志,可以看到 ds0,ds1 两个节点都查询了数据
ShardingSphere-SQL: Logic SQL: SELECT id,name FROM employee
ShardingSphere-SQL: SQLStatement: MySQLSelectStatement(table=Optional.empty, limit=Optional.empty,lock=Optional.empty, window=Optional.empty)
ShardingSphere-SQL: Actual SQL: ds0 ::: SELECT id,name FROM employee
ShardingSphere-SQL: Actual SQL: ds1 ::: SELECT id,name FROM employee