Java-基于 ZooKeeper+Thrift 的 RPC 动态服务注册发现和调用

2022年07月15日 15:41 · 阅读(649) ·

开发环境

名称 版本
操作系统 Windows 10 X64
JDK JDK1.8(jdk-8u151-windows-x64)
IntelliJ IDEA 2021.2
Maven Maven 3.6.0
ZooKeeper 3.8.0

参考

基于ZOOKEEPER+THRIFT的RPC动态服务注册发现和调用(JAVA)

如何安装windows版本的zookeeper

Thrift 环境搭建及 Java Demo 运行

源码下载

demo-thrift.zip

Windows 版本 ZooKeeper 安装

下载

去官网下载安装包 apache-zookeeper-3.8.0-bin.tar.gz

Apache 官网下载地址:https://zookeeper.apache.org/releases.html

解压

解压 apache-zookeeper-3.8.0-bin.tar.gzD:\Program Files\apache-zookeeper-3.8.0-bin

新建目录

  1. D:\Program Files\apache-zookeeper-3.8.0-bin\data
  2. D:\Program Files\apache-zookeeper-3.8.0-bin\log

修改配置

复制 D:\Program Files\apache-zookeeper-3.8.0-bin\conf\zoo_sample.cfg
命名为 D:\Program Files\apache-zookeeper-3.8.0-bin\conf\zoo.cfg

打开 zoo.cfg,修改下面内容

  1. dataDir=D:\Program Files\apache-zookeeper-3.8.0-bin\data
  2. dataLogDir=D:\Program Files\apache-zookeeper-3.8.0-bin\log

启动 ZooKeeper 服务器端

运行

  1. D:\Program Files\apache-zookeeper-3.8.0-bin\bin\zkServer.cmd

启动 ZooKeeper 客户端

运行

  1. D:\Program Files\apache-zookeeper-3.8.0-bin\bin\zkCli.cmd

出现“Welcome to ZooKeeper!”,运行成功

IDEA Zookeeper 插件安装

安装插件 Zookeeper tool

侧边栏即可查看

项目结构

新建项目-demo-thrift

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>demo.thrift</groupId>
  7. <artifactId>demo-thrift</artifactId>
  8. <packaging>pom</packaging>
  9. <version>1.0-SNAPSHOT</version>
  10. <modules>
  11. <module>demo-thrift-contract</module>
  12. <module>demo-thrift-common</module>
  13. <module>demo-thrift-server</module>
  14. <module>demo-thrift-web</module>
  15. </modules>
  16. <properties>
  17. <maven.compiler.source>8</maven.compiler.source>
  18. <maven.compiler.target>8</maven.compiler.target>
  19. </properties>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.slf4j</groupId>
  23. <artifactId>slf4j-log4j12</artifactId>
  24. <version>1.7.2</version>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.apache.commons</groupId>
  28. <artifactId>commons-lang3</artifactId>
  29. <version>3.12.0</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.apache.commons</groupId>
  33. <artifactId>commons-collections4</artifactId>
  34. <version>4.3</version>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.apache.curator</groupId>
  38. <artifactId>curator-framework</artifactId>
  39. <version>4.0.0</version>
  40. </dependency>
  41. <dependency>
  42. <groupId>org.apache.curator</groupId>
  43. <artifactId>curator-recipes</artifactId>
  44. <version>4.0.0</version>
  45. </dependency>
  46. <dependency>
  47. <groupId>org.apache.curator</groupId>
  48. <artifactId>curator-x-discovery</artifactId>
  49. <version>4.0.0</version>
  50. </dependency>
  51. <dependency>
  52. <groupId>org.apache.curator</groupId>
  53. <artifactId>curator-test</artifactId>
  54. <version>4.0.0</version>
  55. <scope>test</scope>
  56. </dependency>
  57. </dependencies>
  58. </project>

新建模块-demo-thrift-contract

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>demo-thrift</artifactId>
  7. <groupId>demo.thrift</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>demo-thrift-contract</artifactId>
  12. <properties>
  13. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  14. <maven.compiler.source>1.8</maven.compiler.source>
  15. <maven.compiler.target>1.8</maven.compiler.target>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>org.apache.thrift</groupId>
  20. <artifactId>libthrift</artifactId>
  21. <version>0.9.2</version>
  22. </dependency>
  23. </dependencies>
  24. <build>
  25. <finalName>${project.name}</finalName>
  26. <plugins>
  27. <plugin>
  28. <groupId>org.apache.thrift.tools</groupId>
  29. <artifactId>maven-thrift-plugin</artifactId>
  30. <version>0.1.11</version>
  31. <configuration>
  32. <!-- 如果thrift执行文件的路径已经设置到PATH环境变量,可以直接简写。否则需要给出完整执行路径 -->
  33. <thriftExecutable>thrift</thriftExecutable>
  34. <outputDirectory>${project.basedir}/src/main/java</outputDirectory>
  35. </configuration>
  36. <executions>
  37. <execution>
  38. <id>thrift-sources</id>
  39. <phase>generate-sources</phase>
  40. <goals>
  41. <goal>compile</goal>
  42. </goals>
  43. </execution>
  44. <execution>
  45. <id>thrift-test-sources</id>
  46. <phase>generate-test-sources</phase>
  47. <goals>
  48. <goal>testCompile</goal>
  49. </goals>
  50. </execution>
  51. </executions>
  52. </plugin>
  53. </plugins>
  54. </build>
  55. </project>

thrift

CommonResult.thrift

  1. namespace java demo.thrift.contract
  2. exception CommonException{
  3. 1:i32 errorCode,
  4. 2:string message
  5. }
  6. struct CommonResultDto{
  7. 1:bool flag = true; //执行标记
  8. 2:string message; //错误信息
  9. 3:string data; //数据
  10. 4:bool overflow = false; //溢出标记
  11. }

TestTableService.thrift

  1. namespace java demo.thrift.contract
  2. include "CommonResult.thrift"
  3. struct TestTableDto{
  4. 1:i32 id,
  5. 2:string no,
  6. 3:string name,
  7. 4:i32 isDelete
  8. }
  9. struct TestTablePageDto{
  10. 1:optional i32 total = 0
  11. 2:optional list<TestTableDto> resultDtos
  12. }
  13. service TestTableService {
  14. CommonResult.CommonResultDto add(1:TestTableDto param) throws(1:CommonResult.CommonException ex)
  15. }

生成 Java 代码

demo.thrift.contract 目录下自动生成 Java 代码

新建模块-demo-thrift-common

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>demo-thrift</artifactId>
  7. <groupId>demo.thrift</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>demo-thrift-common</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. </project>

ZookeeperUtils

  1. package demo.jarvis.common.Utils;
  2. import org.apache.curator.RetryPolicy;
  3. import org.apache.curator.framework.CuratorFramework;
  4. import org.apache.curator.framework.CuratorFrameworkFactory;
  5. import org.apache.curator.framework.recipes.cache.PathChildrenCache;
  6. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
  7. import org.apache.curator.retry.RetryForever;
  8. import com.google.common.util.concurrent.ThreadFactoryBuilder;
  9. import org.apache.curator.utils.CloseableUtils;
  10. import org.apache.zookeeper.CreateMode;
  11. import org.apache.zookeeper.data.ACL;
  12. import java.util.List;
  13. public class ZookeeperUtils implements AutoCloseable {
  14. /**
  15. * zk 服务名称
  16. */
  17. private String name;
  18. /**
  19. * zk 服务器连接字符串
  20. */
  21. private String zkConnectedStr;
  22. /**
  23. * 封装好的client
  24. */
  25. private CuratorFramework client;
  26. public String getName() {
  27. return name;
  28. }
  29. public void setName(String name) {
  30. this.name = name;
  31. }
  32. public String getZkConnectedStr() {
  33. return zkConnectedStr;
  34. }
  35. public void setZkConnectedStr(String zkConnectedStr) {
  36. this.zkConnectedStr = zkConnectedStr;
  37. }
  38. public CuratorFramework getClient() {
  39. return client;
  40. }
  41. public void setClient(CuratorFramework client) {
  42. this.client = client;
  43. }
  44. /**
  45. * 自定义一个异常捕获处理器,只是打印暂时无别的操作
  46. */
  47. private Thread.UncaughtExceptionHandler uncaughtExceptionHandler = new Thread.UncaughtExceptionHandler() {
  48. @Override
  49. public void uncaughtException(Thread t, Throwable e) {
  50. System.out.println(name + ": " + e);
  51. }
  52. };
  53. /**
  54. * 构造函数,启动zk客户端和连接zk服务器
  55. *
  56. * @param name zk 服务名称
  57. * @param zkConnectedStr zk 服务器连接字符串
  58. */
  59. public ZookeeperUtils(String name, String zkConnectedStr) throws Exception {
  60. try {
  61. this.name = name;
  62. this.zkConnectedStr = zkConnectedStr;
  63. RetryPolicy retryPolicy = new RetryForever(10000);
  64. this.client = CuratorFrameworkFactory.builder()
  65. .connectString(zkConnectedStr)
  66. .retryPolicy(retryPolicy)
  67. .sessionTimeoutMs(30 * 1000)
  68. .connectionTimeoutMs(30 * 1000)
  69. .maxCloseWaitMs(60 * 1000)
  70. .threadFactory(new ThreadFactoryBuilder().setNameFormat(name + "-%d").setUncaughtExceptionHandler(uncaughtExceptionHandler).build())
  71. .build();
  72. this.client.start();
  73. this.client.blockUntilConnected();
  74. System.out.println(String.format("zk : %s started.", this.zkConnectedStr));
  75. } catch (InterruptedException e) {
  76. Thread.currentThread().interrupt();
  77. throw new Exception(e);
  78. }
  79. }
  80. /**
  81. * 添加节点
  82. *
  83. * @param path
  84. * @param value
  85. * @param mode
  86. * @return
  87. */
  88. public String add(String path, byte[] value, CreateMode mode) throws Exception {
  89. try {
  90. return this.client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, value);
  91. } catch (Exception e) {
  92. throw new Exception(e);
  93. }
  94. }
  95. /**
  96. * 添加节点
  97. *
  98. * @param path
  99. * @param value
  100. * @param mode
  101. * @param aclList
  102. * @return
  103. */
  104. public String add(String path, byte[] value, CreateMode mode, List<ACL> aclList) throws Exception {
  105. try {
  106. return this.client.create().creatingParentsIfNeeded().withMode(mode).withACL(aclList).forPath(path, value);
  107. } catch (Exception e) {
  108. throw new Exception(e);
  109. }
  110. }
  111. /**
  112. * 判断节点是否存在
  113. *
  114. * @param path
  115. * @return
  116. */
  117. public boolean exist(String path) {
  118. try {
  119. return this.client.checkExists().forPath(path) != null;
  120. } catch (Exception e) {
  121. System.out.println(String.format("zk check exist error, path = %s , %s", path, e));
  122. return false;
  123. }
  124. }
  125. /**
  126. * 移除节点
  127. *
  128. * @param path
  129. */
  130. public void remove(String path) throws Exception {
  131. try {
  132. this.client.delete().forPath(path);
  133. } catch (Exception e) {
  134. throw new Exception(e);
  135. }
  136. }
  137. /**
  138. * 设置节点数据
  139. *
  140. * @param path
  141. * @param value
  142. */
  143. public void set(String path, byte[] value) throws Exception{
  144. try {
  145. this.client.setData().forPath(path, value);
  146. } catch (Exception e) {
  147. throw new Exception(e);
  148. }
  149. }
  150. /**
  151. * 获取节点下的所有子节点
  152. *
  153. * @param nodePath
  154. * @return
  155. */
  156. public List<String> getChildren(String nodePath) {
  157. try {
  158. return this.client.getChildren().forPath(nodePath);
  159. } catch (Exception e) {
  160. System.out.println(String.format("get node children failed, nodePath = %s ", nodePath));
  161. }
  162. return null;
  163. }
  164. /**
  165. * 注册目录监听器
  166. *
  167. * @param nodePath
  168. * @param listener
  169. * @return
  170. */
  171. public PathChildrenCache registerPathChildrenListener(String nodePath, PathChildrenCacheListener listener) {
  172. try {
  173. //创建一个PathChildrenCache
  174. PathChildrenCache pathChildrenCache = new PathChildrenCache(this.client, nodePath, true);
  175. //添加子目录监视器
  176. pathChildrenCache.getListenable().addListener(listener);
  177. //启动监听器
  178. pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
  179. //返回PathChildrenCache
  180. return pathChildrenCache;
  181. } catch (Exception e) {
  182. System.out.println(String.format("register path children node cache listener failed, nodePath = %s ", nodePath));
  183. }
  184. return null;
  185. }
  186. @Override
  187. public void close() throws Exception {
  188. System.out.println(String.format("zk %s - %s closed", this.name, this.zkConnectedStr));
  189. CloseableUtils.closeQuietly(this.client);
  190. }
  191. }

新建模块-demo-thrift-server

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>demo-thrift</artifactId>
  7. <groupId>demo.thrift</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>demo-thrift-server</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>demo.thrift</groupId>
  19. <artifactId>demo-thrift-contract</artifactId>
  20. <version>1.0-SNAPSHOT</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>demo.thrift</groupId>
  24. <artifactId>demo-thrift-common</artifactId>
  25. <version>1.0-SNAPSHOT</version>
  26. </dependency>
  27. </dependencies>
  28. </project>

TestTableServiceImpl

  1. package com.thrift.server.serviceImpl;
  2. import demo.thrift.contract.CommonResultDto;
  3. import demo.thrift.contract.TestTableDto;
  4. import demo.thrift.contract.TestTableService;
  5. import org.apache.thrift.TException;
  6. public class TestTableServiceImpl implements TestTableService.Iface {
  7. @Override
  8. public CommonResultDto add(TestTableDto param) throws TException{
  9. CommonResultDto resultDto = new CommonResultDto();
  10. resultDto.setData("insert:" + param.getNo()+"-"+param.getName());
  11. return resultDto;
  12. }
  13. }

ThriftServer

  1. package com.thrift.server;
  2. import com.thrift.server.serviceImpl.TestTableServiceImpl;
  3. import demo.jarvis.common.Utils.ZookeeperUtils;
  4. import demo.thrift.contract.TestTableService;
  5. import org.apache.commons.lang3.StringUtils;
  6. import org.apache.thrift.protocol.TBinaryProtocol;
  7. import org.apache.thrift.server.TServer;
  8. import org.apache.thrift.server.TThreadedSelectorServer;
  9. import org.apache.thrift.transport.TFramedTransport;
  10. import org.apache.thrift.transport.TNonblockingServerSocket;
  11. import org.apache.thrift.transport.TTransportException;
  12. import org.apache.zookeeper.CreateMode;
  13. import java.net.InetAddress;
  14. import java.util.concurrent.ExecutorService;
  15. import java.util.concurrent.Executors;
  16. import java.util.concurrent.TimeUnit;
  17. public class ThriftServer{
  18. /**
  19. * RPC 所有相关节点公用的顶级父节点
  20. */
  21. private static final String THRIFT_SERVER_PREFIX = "/thrift/";
  22. /**
  23. * 固定的一个单线程池
  24. */
  25. private ExecutorService thread = Executors.newSingleThreadExecutor();
  26. /**
  27. * zk 客户端服务实例
  28. */
  29. private ZookeeperUtils zookeeperUtils;
  30. /**
  31. * RPC 服务名
  32. */
  33. private String name;
  34. /**
  35. * PRC 服务端口号
  36. */
  37. private int port;
  38. /**
  39. * 构造函数,启动RPC节点并注册节点到ZK
  40. *
  41. * @param port PRC 服务端口号
  42. * @param name RPC 服务名
  43. * @param zookeeperUtils zk 客户端服务实例
  44. */
  45. public ThriftServer(int port, String name, ZookeeperUtils zookeeperUtils) throws Exception{
  46. this.zookeeperUtils = zookeeperUtils;
  47. this.name = name;
  48. this.port = port;
  49. this.startAndRegisterService();
  50. }
  51. /**
  52. * 获取本服务即将注册到ZK上的节点路径
  53. * 根据本机器IP和定义的端口号生成唯一路径
  54. *
  55. * @return 路径
  56. */
  57. private String getServiceNodePath() {
  58. try {
  59. InetAddress inetAddress = InetAddress.getLocalHost();
  60. String servicePath = inetAddress.getHostAddress();
  61. return "/".concat(servicePath).concat(":").concat(String.valueOf(port));
  62. } catch (Exception e) {
  63. return null;
  64. }
  65. }
  66. /**
  67. * 注册 RPC 服务父节点
  68. * 比如我们此 RPC 服务为 hello-server
  69. * 注册完之后就是 /thrift/hello-server 永久节点
  70. */
  71. private void register() throws Exception{
  72. if (!zookeeperUtils.exist(THRIFT_SERVER_PREFIX.concat(name))) {
  73. zookeeperUtils.add(THRIFT_SERVER_PREFIX.concat(name), name.getBytes(), CreateMode.PERSISTENT);
  74. }
  75. }
  76. /**
  77. * 注册此 RPC 服务节点
  78. * 注册完之后就是 /thrift/hello-server/10.1.38.226:7778 临时节点
  79. */
  80. private void registerService() throws Exception {
  81. if (!zookeeperUtils.exist(THRIFT_SERVER_PREFIX.concat(name))) {
  82. register();
  83. }
  84. String serviceNodePath = getServiceNodePath();
  85. if (StringUtils.isBlank(serviceNodePath)) {
  86. return;
  87. }
  88. zookeeperUtils.add(THRIFT_SERVER_PREFIX.concat(name).concat(serviceNodePath), String.valueOf(port).getBytes(), CreateMode.EPHEMERAL);
  89. }
  90. /**
  91. * 启动 RPC 服务
  92. * @return 启动成功返回 true
  93. */
  94. private boolean start() {
  95. try {
  96. //构造 thrift-server
  97. TServer server = new TThreadedSelectorServer(new TThreadedSelectorServer.Args(new TNonblockingServerSocket(port))
  98. .protocolFactory(new TBinaryProtocol.Factory())
  99. .processor(new TestTableService.Processor(new TestTableServiceImpl()))
  100. .workerThreads(5)
  101. .transportFactory(new TFramedTransport.Factory()));
  102. //异步线程提交,防止主线程阻塞
  103. thread.submit(server::serve);
  104. //一直轮训等待RPC服务启动成功,服务正常
  105. while (!server.isServing()) {
  106. System.out.println("wait for thrift server start!");
  107. try {
  108. TimeUnit.SECONDS.sleep(1);
  109. } catch (InterruptedException e) {
  110. System.out.println(e);
  111. }
  112. }
  113. } catch (TTransportException e) {
  114. System.out.println(e);
  115. return false;
  116. }
  117. return true;
  118. }
  119. /**
  120. * 启动 RPC 服务并且注册 ZK 节点
  121. */
  122. private void startAndRegisterService() throws Exception {
  123. if (!start()) {
  124. System.out.println(name.concat("start failed!"));
  125. }
  126. registerService();
  127. }
  128. public static void main(String[] args) throws Exception{
  129. //创建 ZK 客户端实例
  130. ZookeeperUtils zookeeperUtils = new ZookeeperUtils("thrift-test", "127.0.0.1:2181");
  131. //异步启动第一个 RPC 服务
  132. new Thread("thrift-server-1") {
  133. @Override
  134. public void run() {
  135. try {
  136. new ThriftServer(7777, "thrift-server", zookeeperUtils);
  137. }catch (Exception e){
  138. System.out.println(e.getMessage());
  139. }
  140. }
  141. }.start();
  142. //异步启动第二个 RPC 服务
  143. new Thread("thrift-server-2") {
  144. @Override
  145. public void run() {
  146. try {
  147. new ThriftServer(7778, "thrift-server", zookeeperUtils);
  148. }catch (Exception e){
  149. System.out.println(e.getMessage());
  150. }
  151. }
  152. }.start();
  153. }
  154. }

新建模块-demo-thrift-web

pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>demo-thrift</artifactId>
  7. <groupId>demo.thrift</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>demo-thrift-web</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>demo.thrift</groupId>
  19. <artifactId>demo-thrift-contract</artifactId>
  20. <version>1.0-SNAPSHOT</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>demo.thrift</groupId>
  24. <artifactId>demo-thrift-common</artifactId>
  25. <version>1.0-SNAPSHOT</version>
  26. </dependency>
  27. </dependencies>
  28. </project>

ThriftWeb

  1. package demo.thrift.web;
  2. import demo.jarvis.common.Utils.ZookeeperUtils;
  3. import demo.thrift.contract.CommonResultDto;
  4. import demo.thrift.contract.TestTableDto;
  5. import demo.thrift.contract.TestTableService;
  6. import org.apache.commons.collections4.MapUtils;
  7. import org.apache.commons.lang3.ArrayUtils;
  8. import org.apache.commons.lang3.StringUtils;
  9. import org.apache.curator.framework.recipes.cache.ChildData;
  10. import org.apache.thrift.TException;
  11. import org.apache.thrift.protocol.TBinaryProtocol;
  12. import org.apache.thrift.protocol.TProtocol;
  13. import org.apache.thrift.transport.TFramedTransport;
  14. import org.apache.thrift.transport.TSocket;
  15. import org.apache.thrift.transport.TTransport;
  16. import org.apache.thrift.transport.TTransportException;
  17. import java.util.ArrayList;
  18. import java.util.Collections;
  19. import java.util.List;
  20. import java.util.Map;
  21. import java.util.concurrent.ConcurrentHashMap;
  22. import java.util.concurrent.TimeUnit;
  23. public class ThriftWeb {
  24. /**
  25. * 自己知道自己的 RPC 服务节点路径所以写死
  26. */
  27. private static final String NODE_PATH = "/thrift/thrift-server";
  28. /**
  29. * IP 和端口号分隔符
  30. */
  31. private static final String SPLIT_STR = ":";
  32. /**
  33. * ZK 客户端实例
  34. */
  35. private ZookeeperUtils zookeeperUtils;
  36. /**
  37. * RPC 服务连接池,Map 本地简单存储
  38. */
  39. private Map<String, TProtocol> protocolMap = new ConcurrentHashMap<>();
  40. /**
  41. * 构造函数,RPC 服务发现和 RPC 服务动态监听
  42. * @param zookeeperUtils ZK 客户端实例
  43. */
  44. public ThriftWeb(ZookeeperUtils zookeeperUtils) {
  45. this.zookeeperUtils = zookeeperUtils;
  46. this.serverDetect();
  47. this.serverListening();
  48. }
  49. /**
  50. * 获取一个 RPC 服务连接
  51. * @return RPC 服务连接
  52. */
  53. private TProtocol getProtocol() {
  54. //如果连接池为空,则返回空
  55. if (MapUtils.isEmpty(protocolMap)) {
  56. return null;
  57. }
  58. //随机取出一个RPC服务连接
  59. List<TProtocol> tmp = new ArrayList<>(new ArrayList<>(protocolMap.values()));
  60. Collections.shuffle(tmp);
  61. //打印一下使用的那个连接
  62. System.out.println(tmp.get(0).toString());
  63. return tmp.get(0);
  64. }
  65. /**
  66. * 对应 zk 节点路径的 RPC 服务进连接池
  67. * @param path RPC 服务节点路径
  68. */
  69. private void inProtocolPool(String path) {
  70. if (StringUtils.isBlank(path)) {
  71. return;
  72. }
  73. //解析该节点的服务地址
  74. String[] address = path.split(SPLIT_STR);
  75. if (ArrayUtils.isEmpty(address) || address.length != 2) {
  76. return;
  77. }
  78. if (protocolMap.containsKey(path)) {
  79. return;
  80. }
  81. try {
  82. //创建与 RPC 服务端的连接
  83. TTransport tTransport = new TSocket(address[0], Integer.parseInt(address[1]));
  84. TFramedTransport framedTransport = new TFramedTransport(tTransport);
  85. TProtocol tProtocol = new TBinaryProtocol(framedTransport);
  86. tTransport.open();
  87. //进连接池
  88. protocolMap.put(path, tProtocol);
  89. } catch (TTransportException e) {
  90. System.out.println(e);
  91. }
  92. }
  93. /**
  94. * 对应 zk 节点路径的 RPC 服务出连接池
  95. * @param path RPC 服务节点路径
  96. */
  97. private void outProtocolPool(String path) {
  98. if (!protocolMap.containsKey(path)) {
  99. return;
  100. }
  101. //出连接池
  102. TProtocol protocol = protocolMap.remove(path);
  103. //关闭连接
  104. protocol.getTransport().close();
  105. }
  106. /**
  107. * 发现 RPC 服务
  108. */
  109. private void serverDetect() {
  110. List<String> childrenNodePaths = zookeeperUtils.getChildren(NODE_PATH);
  111. if (childrenNodePaths != null) {
  112. childrenNodePaths.forEach(this::inProtocolPool);
  113. }
  114. }
  115. /**
  116. * 动态监听 RPC 服务
  117. */
  118. private void serverListening() {
  119. //注册 ZK 目录监听器
  120. zookeeperUtils.registerPathChildrenListener(NODE_PATH, (curatorClient, event) -> {
  121. //获取变化的节点数据
  122. ChildData childData = event.getData();
  123. if (childData == null) {
  124. return;
  125. }
  126. switch (event.getType()) {
  127. case CHILD_ADDED://新增RPC节点
  128. System.out.println(String.format("path children add children node %s now", childData.getPath()));
  129. //新节点进RPC服务连接池
  130. inProtocolPool(childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1));
  131. break;
  132. case CHILD_REMOVED://减少RPC节点
  133. System.out.println(String.format("path children delete children node %s now", childData.getPath()));
  134. //失去的节点出RPC服务连接池
  135. outProtocolPool(childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1));
  136. break;
  137. case CONNECTION_LOST://RPC节点连接丢失
  138. System.out.println(String.format("path children connection lost %s now", childData.getPath()));
  139. //断开连接节点出RPC服务连接池
  140. outProtocolPool(childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1));
  141. break;
  142. case CONNECTION_RECONNECTED://RPC节点重连
  143. System.out.println(String.format("path children connection reconnected %s now", childData.getPath()));
  144. //重新连接的节点出RPC服务连接池
  145. inProtocolPool(childData.getPath().substring(childData.getPath().lastIndexOf("/") + 1));
  146. break;
  147. default://无操作
  148. break;
  149. }
  150. });
  151. }
  152. /**
  153. * 客户端 addTestTable
  154. *
  155. * @param id
  156. * @return
  157. */
  158. private CommonResultDto addTestTable(TestTableDto param) {
  159. //获取一个 RPC 服务连接
  160. TProtocol protocol = this.getProtocol();
  161. if (protocol == null) {
  162. return null;
  163. }
  164. //创建一个RPC实例
  165. TestTableService.Client client = new TestTableService.Client(protocol);
  166. try {
  167. return client.add(param);
  168. } catch (TException e) {
  169. System.out.println(e);
  170. }
  171. return null;
  172. }
  173. public static void main(String[] args) throws Exception {
  174. //ZK 客户端实例
  175. ZookeeperUtils zookeeperUtils = new ZookeeperUtils("thrift-test", "127.0.0.1:2181");
  176. //Thrift 客户端
  177. ThriftWeb thriftWeb = new ThriftWeb(zookeeperUtils);
  178. //每五秒打印三次 addTestTable 的结果,结果可想而知,使用的rpc服务不是同一个,会随机选取调用
  179. while (true) {
  180. for (int i = 0; i < 3; i++) {
  181. TestTableDto testTableDto = new TestTableDto();
  182. testTableDto.setNo("no-" + i);
  183. testTableDto.setName("name-" + i);
  184. System.out.println(thriftWeb.addTestTable(testTableDto).getData());
  185. }
  186. TimeUnit.SECONDS.sleep(5);
  187. }
  188. }
  189. }

测试

demo-thrift-server

● 启动 demo-thrift-server - ThriftServer - main()

控制台输出

  1. log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
  2. log4j:WARN Please initialize the log4j system properly.
  3. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  4. zk : 127.0.0.1:2181 started.
  5. wait for thrift server start!
  6. wait for thrift server start!

demo-thrift-web

● 启动 demo-thrift-web - ThriftWeb - main()

控制台输出

  1. log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
  2. log4j:WARN Please initialize the log4j system properly.
  3. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
  4. zk : 127.0.0.1:2181 started.
  5. org.apache.thrift.protocol.TBinaryProtocol@536aaa8d
  6. insert:no-0-name-0
  7. org.apache.thrift.protocol.TBinaryProtocol@536aaa8d
  8. insert:no-1-name-1
  9. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  10. insert:no-2-name-2
  11. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  12. insert:no-0-name-0
  13. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  14. insert:no-1-name-1
  15. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  16. insert:no-2-name-2
  17. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  18. insert:no-0-name-0
  19. org.apache.thrift.protocol.TBinaryProtocol@528931cf
  20. insert:no-1-name-1
  21. org.apache.thrift.protocol.TBinaryProtocol@536aaa8d
  22. insert:no-2-name-2