热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Golang实现UDPServer并发送消息到ActiveMQ

示例代码packagemainimport("net""os"&am

示例代码

package main

import (
    "net"
    "os"
    "github.com/gpmgo/gopm/modules/goconfig"
    "github.com/go-stomp/stomp"
    "time"
    "strconv"
    "log"
    "strings"
)




// 限制goroutine数量
var limitChan = make(chan bool, 10000) // Todo 从配置文件中读取

// 限制同时处理消息数量
var msgChan = make(chan string, 10000) // Todo 从配置文件中读取
var activeMqLimitedChan = make(chan bool, 100)
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var cOnnectTimes= 0
var udpAddress = "0.0.0.0"    // Todo 从配置文件中读取
var udpPort = "514"    // Todo 从配置文件中读取
var logFilePath = "/var/log/syslog_server/"
var cOnfigFilePath= "./config.ini"
// UDP goroutine 实现并发读取UDP数据
func udpProcess(conn *net.UDPConn) {
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpProcess error:", e)
        }
        // 释放出一个协程
        <- limitChan
    }()

    // 最大读取数据大小
    data := make([]byte, 1024)
    n, _, err := conn.ReadFromUDP(data)
    if err != nil {
        panic(err)
    }

    // 获取对端的IP地址
    // remoteAddr := conn.RemoteAddr()
    // msgChan <- remoteAddr.String() + " " + string(data[:n])

    msgChan <- string(data[:n])


}


func udpServer(address, port string) {
    // @todo 如何防止udpServer 一直Panic导致无限循环重启
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("udpServer error:", e)

            // udpServer启动失败后,间隔10秒后重试
            time.Sleep(10 * time.Second)
            udpServer(udpAddress, udpPort)
        }
    }()

    udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
    conn, err := net.ListenUDP("udp", udpAddr)
    defer conn.Close()
    if err != nil {
        panic(err)
    }

    for {
        limitChan <- true
        go udpProcess(conn)
    }
}


// 读取ActiveMQ配置信息
func getConfiguration(){
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
                os.Exit(1)
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("Get Configuration error:", e)
        }
    }()

    configFile, err := goconfig.LoadConfigFile(configFilePath)
    if err != nil {
        panic(err)
    }

    host, err = configFile.GetValue("active_mq", "host")
    if err != nil {
        // 如果没有配置主机,则使用本地主机
        host = "127.0.0.1"
    }
    port, err = configFile.GetValue("active_mq", "port")
    if err != nil {
        // 如果没配置端口,则使用默认端口
        port = "61613"
    }

    activeQueue, err = configFile.GetValue("active_mq", "queue")
    if err != nil {
        // 如果没配置端口,则使用默认队列名
        activeQueue = "syslog.queue"
    }
}

// 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
func connActiveMq(){
    // @todo 如何防止无限循环
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("connActiveMq error:", e)

            // ActiveMQ服务器连接失败后,间隔3秒后重试
            time.Sleep(3 * time.Second)
            activeMq = nil
            connActiveMq()
        }
    }()

    // @todo 实现断开重连
    if activeMq == nil {
        var err error
        activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
        if err != nil {
            connectTimes ++
            if connectTimes >= 100 {
                time.Sleep(60 * time.Second)
            }else if connectTimes >= 10 {
                time.Sleep(10 * time.Second)
            }else {
                time.Sleep(3 * time.Second)
            }
            panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes))

        }else {
            connectTimes = 0
        }
    }
}


func activeMqProducer(c chan string){
    // @todo 如何防止activeMqProducer 退出
    defer func() {
        if e := recover(); e != nil {
            // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
            logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
            logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
            defer logFile.Close()
            if err != nil {
                log.Fatalln("open log file error: ", err.Error())
            }
            logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

            // 记录错误日志
            logger.Println("activeMqProducer error:", e)

            // 重试
            go activeMqProducer(msgChan)
        }
    }()
    for{
        activeMqLimitedChan <- true    // 限制开启协程数量
        contentMsg := <-c
        go func() {
            defer func() {
                if e := recover(); e != nil {
                    err := os.MkdirAll(logFilePath, 777)
                    log.Fatalln("create log dirctory error: ", err.Error())
                    // 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
                    logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
                    logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
                    defer logFile.Close()
                    if err != nil {
                        log.Fatalln("open log file error: ", err.Error())
                    }
                    logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)

                    // 记录错误日志
                    logger.Println("activeMqProducer error:", e)
                }
                // 释放出一个协程
                <- activeMqLimitedChan
            }()

            err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
            if err != nil {
                if err.Error() == "connection already closed"{
                    activeMq = nil
                    connActiveMq()
                    activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
                }
                panic(err)
            }
        }()

    }

}


func init(){
    // 初始化 ActiveMQ 配置
    getConfiguration()

    // 连接到 ActiveMQ 服务器
    connActiveMq()

    // 启动一个协程将Syslog消息放入ActiveMQ队列中
    go activeMqProducer(msgChan)

}

func main() {
    defer activeMq.Disconnect()
    udpServer(udpAddress, udpPort)
}

 


推荐阅读
  • Zabbix自定义监控与邮件告警配置实践
    本文详细介绍了如何在Zabbix中添加自定义监控项目,配置邮件告警功能,并解决测试告警时遇到的邮件不发送问题。 ... [详细]
  • 本文基于Java官方文档进行了适当修改,旨在介绍如何实现一个能够同时处理多个客户端请求的服务端程序。在前文中,我们探讨了单客户端访问的服务端实现,而本篇将深入讲解多客户端环境下的服务端设计与实现。 ... [详细]
  • 本文详细介绍了如何在Oracle VM VirtualBox中实现主机与虚拟机之间的数据交换,包括安装Guest Additions增强功能,以及如何利用这些功能进行文件传输、屏幕调整等操作。 ... [详细]
  • 本文详细介绍了在PHP中如何获取和处理HTTP头部信息,包括通过cURL获取请求头信息、使用header函数发送响应头以及获取客户端HTTP头部的方法。同时,还探讨了PHP中$_SERVER变量的使用,以获取客户端和服务器的相关信息。 ... [详细]
  • 本文详细介绍了如何在PHP中使用Memcached进行数据缓存,包括服务器连接、数据操作、高级功能等。 ... [详细]
  • 本文分享了作者在使用LaTeX过程中的几点心得,涵盖了从文档编辑、代码高亮、图形绘制到3D模型展示等多个方面的内容。适合希望深入了解LaTeX高级功能的用户。 ... [详细]
  • Go语言实现文件读取与终端输出
    本文介绍如何使用Go语言编写程序,通过命令行参数指定文件路径,读取文件内容并将其输出到控制台。代码示例中包含了错误处理和资源管理的最佳实践。 ... [详细]
  • 二维码的实现与应用
    本文介绍了二维码的基本概念、分类及其优缺点,并详细描述了如何使用Java编程语言结合第三方库(如ZXing和qrcode.jar)来实现二维码的生成与解析。 ... [详细]
  • 本文详细介绍了如何正确设置Shadowsocks公共代理,包括调整超时设置、检查系统限制、防止滥用及遵守DMCA法规等关键步骤。 ... [详细]
  • 本文探讨了如何通过Service Locator模式来简化和优化在B/S架构中的服务命名访问,特别是对于需要频繁访问的服务,如JNDI和XMLNS。该模式通过缓存机制减少了重复查找的成本,并提供了对多种服务的统一访问接口。 ... [详细]
  • Hadoop MapReduce 实战案例:手机流量使用统计分析
    本文通过一个具体的Hadoop MapReduce案例,详细介绍了如何利用MapReduce框架来统计和分析手机用户的流量使用情况,包括上行和下行流量的计算以及总流量的汇总。 ... [详细]
  • Maven + Spring + MyBatis + MySQL 环境搭建与实例解析
    本文详细介绍如何使用MySQL数据库进行环境搭建,包括创建数据库表并插入示例数据。随后,逐步指导如何配置Maven项目,整合Spring框架与MyBatis,实现高效的数据访问。 ... [详细]
  • 本文深入探讨了Go语言中的接口型函数,通过实例分析其灵活性和强大功能,帮助开发者更好地理解和运用这一特性。 ... [详细]
  • Go从入门到精通系列视频之go编程语言密码学哈希算法(二) ... [详细]
  • 问题场景用Java进行web开发过程当中,当遇到很多很多个字段的实体时,最苦恼的莫过于编辑字段的查看和修改界面,发现2个页面存在很多重复信息,能不能写一遍?有没有轮子用都不如自己造。解决方式笔者根据自 ... [详细]
author-avatar
手机用户2602892387
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有