亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

Java使用UDP發送數據到InfluxDB

標簽:
Java

最近在做压测引擎相关的开发需要将聚合数据发送到InfluxDB保存以便实时分析和控制QPS。

下面介绍对InfluxDB的使用。

什么是InfluxDB

InfluxDB是一款用Go语言编写的开源分布式时序、事件和指标数据库无需外部依赖。该数据库现在主要用于存储涉及大量的时间戳数据如DevOps监控数据APP metrics, loT传感器数据和实时分析数据。

InfluxDB特征

  • 无结构(无模式)可以是任意数量的列(tags)。

  • 可以设置metric的保存时间。

  • 支持与时间有关的相关函数(如min、max、sum、count、mean、median等)方便统计。

  • 支持存储策略可以用于数据的删改(influxDB没有提供数据的删除与修改方法)。

  • 支持连续查询是数据库中自动定时启动的一组语句和存储策略搭配可以降低InfluxDB的系统占用量。

  • 原生的HTTP支持内置HTTP API。

  • 支持类似SQL语法。

  • 支持设置数据在集群中的副本数。

  • 支持定期采样数据写入另外的measurement方便分粒度存储数据。

  • 自带web管理界面方便使用(登入方式http://< InfluxDB-IP >:8083)。

  • 支持Grafana画图展示。

PS有了InfluxDB+Grafana后你就可以写一些简单的程序了可以只负责写后端逻辑部分数据都可以存入InfluxDB然后通过Grafana展示出来。

Mac安装InfluxDB

# 安装brew install influxdb# 启动influxd -config /usr/local/etc/influxdb.conf# 查看influxdb运行配置influxd config# 启动客户端influx -precision rfc3339

InfluxDB开启UDP配置

vim /usr/local/etc/influxdb.conf

开启udp配置其他为默认值

[[udp]]
  enabled = true

udp配置含义

[[udp]] – udp配置

    enabled是否启用该模块默认值false。

    bind-address绑定地址默认值”:8089″。

    database数据库名称默认值”udp”。

    retention-policy存储策略无默认值。

    batch-size默认值5000。

    batch-pending默认值10。

    read-bufferudp读取buffer的大小0表示使用操作系统提供的值如果超过操作系统的默认配置则会出错。 该配置的默认值0。

    batch-timeout超时时间默认值”1s”。

    precision时间精度无默认值。

Java发送UDP数据报

我们知道InfluxDB是支持Http的为什么我们还要采用UDP方式发送数据呢

基于下列原因

  1. TCP数据传输慢UDP数据传输快。

  2. 网络带宽需求较小而实时性要求高。

  3. InfluxDB和服务器在同机房发生数据丢包的可能性较小即使真的发生丢包对整个请求流量的收集影响也较小。

我们采用了worker线程调用addMetric方法将数据存储到缓存 map 中send线程池来进行每个指定时间发送数据到Influxdb。

代码如下(也可参考JmeterUdpMetricsSender类)

@Slf4jpublic class InfluxDBClient implements Runnable {    private String measurement = "example";    private final Object lock = new Object();    private InetAddress hostAddress;    private int udpPort;    private volatile Map<String, List<Response>> metrics = new HashMap<>();    private long time;    private String transaction;    public InfluxDBClient(String influxdbUrl, String transaction) {        this.transaction = transaction;        try {
            log.debug("Setting up with url:{}", influxdbUrl);
            String[] urlComponents = influxdbUrl.split(":");            if (urlComponents.length == 2) {
                hostAddress = InetAddress.getByName(urlComponents[0]);
                udpPort = Integer.parseInt(urlComponents[1]);
            } else {                throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "' is wrong. The format shoule be <host/ip>:<port>");
            }
        } catch (Exception e) {            throw new IllegalArgumentException("InfluxDBClient url '" + influxdbUrl + "' is wrong. The format shoule be <host/ip>:<port>", e);
        }
    }    public void addMetric(Response response) {        synchronized (lock) {            if (metrics.containsKey(response.getLabel())) {
                metrics.get(response.getLabel()).add(response);
            } else {
                metrics.put(response.getLabel(), new ArrayList<>(Collections.singletonList(response)));
            }
        }
    }    @Override
    public void run() {
        sendMetrics();
    }    private void sendMetrics() {
        Map<String, List<Response>> tempMetrics;        //复制数据到tempMetrics清空原来metrics并初始化上次的大小
        synchronized (lock) {            if (isEmpty(metrics)) {                return;
            }
            time = System.currentTimeMillis();
            tempMetrics = metrics;
            metrics = new HashMap<>();            for (Map.Entry<String, List<Response>> entry : tempMetrics.entrySet()) {
                metrics.put(entry.getKey(), new ArrayList<>(entry.getValue().size()));
            }
        }        final Map<String, List<Response>> copyMetrics = tempMetrics;        final List<MetricTuple> aggregateMetrics = aggregate(copyMetrics);
        StringBuilder sb = new StringBuilder(aggregateMetrics.size() * 200);        //发送tempMetrics,生成一行数据然后换行
        for (MetricTuple metric : aggregateMetrics) {
            sb.append(metric.getMeasurement()).append(metric.getTag()).append(" ")
                    .append(metric.getField()).append(" ").append(metric.getTimestamp() + "000000").append("\n");
        }        //udp发送数据到Influxdb
        try (DatagramSocket ds = new DatagramSocket()) {            byte[] buf = sb.toString().getBytes();
            DatagramPacket dp = new DatagramPacket(buf, buf.length, this.hostAddress, this.udpPort);
            ds.send(dp);
            log.debug("send {} to influxdb", sb.toString());
        } catch (SocketException e) {
            log.error("Cannot open udp port!", e);
        } catch (IOException e) {
            log.error("Error in transferring udp package", e);
        }
    }    /**
     * 得到聚合数据
     *
     * @param metrics
     * @return
     */
    private List<MetricTuple> aggregate(Map<String, List<Response>> metrics) {

    }    public boolean isEmpty(Map<String, List<Response>> map) {        for (Map.Entry<String, List<Response>> entry : map.entrySet()) {            if (!entry.getValue().isEmpty()) {                return false;
            }
        }        return true;
    }
}

参考文档

  1. InfluxDB中文文档

  2. 玩转时序数据库InfluxDB

原文出处https://www.cnblogs.com/morethink/p/9693561.html  

點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消