package main
import (
"net"
"os"
"github.com/gpmgo/gopm/modules/goconfig"
"github.com/go-stomp/stomp"
"time"
"strconv"
"log"
"strings"
)
// 限制goroutine数量
var limitChan = make(chan bool, 10000) // Todo 从配置文件中读取
// 限制同时处理消息数量
var msgChan = make(chan string, 10000) // Todo 从配置文件中读取
var activeMqLimitedChan = make(chan bool, 100)
var activeMq *stomp.Conn
var activeQueue string
var host string
var port string
var cOnnectTimes= 0
var udpAddress = "0.0.0.0" // Todo 从配置文件中读取
var udpPort = "514" // Todo 从配置文件中读取
var logFilePath = "/var/log/syslog_server/"
var cOnfigFilePath= "./config.ini"
// UDP goroutine 实现并发读取UDP数据
func udpProcess(conn *net.UDPConn) {
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("udpProcess error:", e)
}
// 释放出一个协程
<- limitChan
}()
// 最大读取数据大小
data := make([]byte, 1024)
n, _, err := conn.ReadFromUDP(data)
if err != nil {
panic(err)
}
// 获取对端的IP地址
// remoteAddr := conn.RemoteAddr()
// msgChan <- remoteAddr.String() + " " + string(data[:n])
msgChan <- string(data[:n])
}
func udpServer(address, port string) {
// @todo 如何防止udpServer 一直Panic导致无限循环重启
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("udpServer error:", e)
// udpServer启动失败后,间隔10秒后重试
time.Sleep(10 * time.Second)
udpServer(udpAddress, udpPort)
}
}()
udpAddr, err := net.ResolveUDPAddr("udp", net.JoinHostPort(address, port))
conn, err := net.ListenUDP("udp", udpAddr)
defer conn.Close()
if err != nil {
panic(err)
}
for {
limitChan <- true
go udpProcess(conn)
}
}
// 读取ActiveMQ配置信息
func getConfiguration(){
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error(), ", programing exit.")
os.Exit(1)
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("Get Configuration error:", e)
}
}()
configFile, err := goconfig.LoadConfigFile(configFilePath)
if err != nil {
panic(err)
}
host, err = configFile.GetValue("active_mq", "host")
if err != nil {
// 如果没有配置主机,则使用本地主机
host = "127.0.0.1"
}
port, err = configFile.GetValue("active_mq", "port")
if err != nil {
// 如果没配置端口,则使用默认端口
port = "61613"
}
activeQueue, err = configFile.GetValue("active_mq", "queue")
if err != nil {
// 如果没配置端口,则使用默认队列名
activeQueue = "syslog.queue"
}
}
// 使用IP和端口连接到ActiveMQ服务器, 返回ActiveMQ连接对象
func connActiveMq(){
// @todo 如何防止无限循环
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("connActiveMq error:", e)
// ActiveMQ服务器连接失败后,间隔3秒后重试
time.Sleep(3 * time.Second)
activeMq = nil
connActiveMq()
}
}()
// @todo 实现断开重连
if activeMq == nil {
var err error
activeMq, err = stomp.Dial("tcp", net.JoinHostPort(host, port))
if err != nil {
connectTimes ++
if connectTimes >= 100 {
time.Sleep(60 * time.Second)
}else if connectTimes >= 10 {
time.Sleep(10 * time.Second)
}else {
time.Sleep(3 * time.Second)
}
panic(err.Error() + ", 重新连接ActiveMQ, 已重试次数: " + strconv.Itoa(connectTimes))
}else {
connectTimes = 0
}
}
}
func activeMqProducer(c chan string){
// @todo 如何防止activeMqProducer 退出
defer func() {
if e := recover(); e != nil {
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("activeMqProducer error:", e)
// 重试
go activeMqProducer(msgChan)
}
}()
for{
activeMqLimitedChan <- true // 限制开启协程数量
contentMsg := <-c
go func() {
defer func() {
if e := recover(); e != nil {
err := os.MkdirAll(logFilePath, 777)
log.Fatalln("create log dirctory error: ", err.Error())
// 初始化日志,每天生成一个日志文件,日志文件名以日志结尾
logFileName := logFilePath + "server-" + strings.Split(time.Now().String(), " ")[0] + ".log"
logFile, err := os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0766) // 应该判断error,此处简略
defer logFile.Close()
if err != nil {
log.Fatalln("open log file error: ", err.Error())
}
logger := log.New(logFile, "[Error] ", log.Ldate|log.Ltime|log.Lshortfile)
// 记录错误日志
logger.Println("activeMqProducer error:", e)
}
// 释放出一个协程
<- activeMqLimitedChan
}()
err := activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
if err != nil {
if err.Error() == "connection already closed"{
activeMq = nil
connActiveMq()
activeMq.Send(activeQueue, "text/plain", []byte(contentMsg))
}
panic(err)
}
}()
}
}
func init(){
// 初始化 ActiveMQ 配置
getConfiguration()
// 连接到 ActiveMQ 服务器
connActiveMq()
// 启动一个协程将Syslog消息放入ActiveMQ队列中
go activeMqProducer(msgChan)
}
func main() {
defer activeMq.Disconnect()
udpServer(udpAddress, udpPort)
}