由于UDP协议只提供数据的不可靠传输,将接收到的数据保存到BlockingQueue中

由于UDP协议只提供数据的不可靠传输,会通过udp发送一些信息

在Linux 上,编写一个每秒接收 100万UDP数据包的程序究竟有多难?,udp有多难

在Linux 上,编写一个每秒接收 100万UDP数据包的程序究竟有多难?
写的不错,转载一下

UDP接收百万级数据的解决方案,udp接收解决方案

1. UDP概念

   用户数据报协议(英语:User Datagram
Protocol,缩写为 UDP),又称使用者资料包协定,是一个简单的面向数据报的传输层协议,正式规范为RFC
768

 
 在TCP/IP模型中,UDP为网络层以上和应用层以下提供了一个简单的接口。UDP只提供数据的不可靠传递,它一旦把应用程序发给网络层的数据发送出去,就不保留数据备份(所以UDP有时候也被认为是不可靠的数据报协议)。UDP在IP数据报的头部仅仅加入了复用和数据校验(字段)。

小序

到新公司不久,就接到一个任务:有个发送方,会通过udp发送一些信息,然后服务接收到信息后保存到数据库的一张表A,保存的这些数据在经过一系列处理,处理完成后累积到另一张表B,然后清空处理的表A的数据。目前发送方比较少,不久就要增加到100个。

2. UDP丢包问题分析

由于UDP协议只提供数据的不可靠传输,数据包发出去后就不管了,数据包在网络的传输过程中都可能丢失。甚至即使数据包成功到达了接收端节点,也不意味着应用程序能够收到,因为要从网卡到达应用程序还需要经历很多阶段,每个阶段都可能丢包。

上图描述了一种应用程序接受网络数据包的典型路径图。

首先,NIC(网卡)接收和处理网络数据包。网卡有自己的硬件数据缓冲区,当网络数据流量太大,大到超过网卡的接收数据硬件缓冲区,这时新进入的数据包将覆盖之前缓冲区的数据包,导致丢失。网卡是否丢包取决于网卡本身的计算性能和硬件缓冲区的大小。

其次,网卡处理后把数据包交给操作系统缓冲区。数据包在操作系统阶段丢包主要取决于以下因素:

  • 操作系统缓冲区的大小
  • 系统的性能
  • 系统的负载
  • 网络相关的系统负载

最后,当数据包到达应用程序的socket缓冲区,如果应用程序不能及时从socket缓冲区把数据包取走,累积的数据包将会超出应用程序socket缓冲区阀值,导致缓冲区溢出,数据包丢失。数据包在应用程序阶段丢包主要取决于以下因素:

  • 应用程序缓冲区大小
  • 应用程序处理数据包的能力,即如何尽可能快的从缓冲区取走数据包

方案

我采用netty5来进行udp的网络通讯,将接收到的数据保存到BlockingQueue中,然后读取BlockingQueue中的数据,取到100条就存到hbase数据库中。

3. 针对UDP丢包问题,进行系统层面和程序层面调优

部分代码

  • ### 初始化netty

 

int DEFAULT_PORT = 6000;
EventLoopGroup group = new NioEventLoopGroup();
  try {
   Bootstrap bootstrap = new Bootstrap();
   bootstrap.group(group).channel(NioDatagramChannel.class).option(ChannelOption.SO_BROADCAST, true)
     .handler(new UdpServerHandler());
   Channel channel = bootstrap.bind(DEFAULT_PORT).sync().channel();
   channel.closeFuture().await();
   LOGGER.info("netty初始化成功!");
  } catch (InterruptedException e) {
   e.printStackTrace();
  } finally {
   group.shutdownGracefully();
  }

 

  

 

  • ### 接收udp数据

 

public BlockingQueue<Map<String, Object>> queue = 
new LinkedBlockingQueue<Map<String, Object>>(990000);
protected void messageReceived(ChannelHandlerContext ctx, DatagramPacket msg) throws Exception {
   // 因为Netty对UDP进行了封装,所以接收到的是DatagramPacket对象。
   String result = msg.content().toString(CharsetUtil.UTF_8);

   Map<String, Object> getMap = new HashMap<String, Object>();
//处理数据


queue.put(getMap);


ctx.writeAndFlush(new DatagramPacket(
Unpooled.copiedBuffer("结果:", CharsetUtil.UTF_8), msg.sender()));
  }

 

  • ### 读取数据存hbase

public void getDate() {
  LOGGER.info("开始取数据");
  List<Map<String, Object>> jsonList = new ArrayList<Map<String, Object>>();
   while (true) {
    Map<String, Object> takeMap = null;
    try {
     takeMap = queue.take();
     if (takeMap == null) {
      continue;
     }
     jsonList.add(takeMap);
     if (jsonList.size() == 100) {
      String httpJson = HbaseUtil.toHttpJson(vo.getTableName(), jsonList);
      LOGGER.info(httpJson);
      List<HbaseDataEntity> hbaseDatas =ParseJson.getData(httpJson);
      HbaseAPI.insertDataList(hbaseDatas);
      jsonList.clear();
      LOGGER.info("hbase存了100条");
     }
    } catch (Exception e) {
     jsonList.clear();
     continue;
    }
   }

  }