初学linux服务器开发时,我们的服务器是很简单的,只需要一个程序完成与客户端的连接,接收客户端数据,数据处理,向客户端发送数据。
但是在处理量很大的情况下,一台机器不能满足我们的需求,此时我们应该怎么办。
我们可以将服务端的任务分摊到多台机器上完成,见下图
从图中可见,此时整个服务端主要分为了三部分。
网关服务器:负责连接客户端与逻辑服务器,在两者间完成数据转发,使用负载均衡算法保证每个逻辑服务器的工作量均衡,以及进行数据加密。
逻辑服务器:负责业务逻辑的处理,与网关服务器进行数据交互,同时与数据库服务器进行数据交互。
数据库服务器:数据存储与读取的具体执行者。
当我们需要用到网关服务器来负载均衡时,我可以假定我们需要处理的客户端请求是很多的(当然,我这里只是为了学习,具体业务并不需要),也就是说我们需要高并发,高效处理。
因为网关服务器在客户端和逻辑服务器间相当于纽带的作用,所有的数据包都要从此经过,所以我们的网关服务器必须要保证可以高效的处理大量连接上的事件。
如上所说,如果网关服务器被恶意发起连接,一旦挂掉,我们的全部服务都会终止,因此我们必须要对这种情况进行处理。同时,还有与客户端交互时的数据加密,这个事也是要交给网关服务器来进行的。逻辑服务器一般都会与网关服务器配置于同一个局域网,所以通常不需要考虑数据的加密。
逻辑服务器和客户端都会连接在网关服务器上,而网关服务器需要对其sockfd进行标识,要知晓究竟谁是服务器,谁是客户端,而且要对客户端的连接加一条可检索属性(比如用户名).
为什么呢?因为对于客户端发送过来的数据,我们无论转到哪个逻辑服务器上都可以,而逻辑服务器返回的数据,我们需要知道要将该数据返回给哪个客户端,逻辑服务器并不能知道每个客户端的sockfd是多少。
我们不会去为每个sockfd都分配一个线程去服务它,我们更需要有一个线程可以去监听所有的fd上的事件,如果发生,我们再去分配线程去处理他。这就是多路复用。
多路复用有select poll epoll,几乎凡是知道多路复用的人都知道epoll的高效。因为其底层红黑树,以及回调机制,是我们最好的选择(在大量连接,活跃量不高的情况下)。
而epoll分两种工作模式,LT和ET,LT模式下,epoll只是一个高效的poll,ET模式下会更高效。事实上众多的第三方库都使用的是LT模式,说白了就是性价比,LT已经很高效,而改用ET模式,除了效率会更高,也会给编写带来一些复杂性以及产生一些头疼的问题,而处理这些特殊情况也需要时间,处理方式不当的话反而还不如LT,所以,总而言之,性价比不高。(本人为了学习,此处使用的et模式)。
每个连接的sockfd,我们都有两种操作其的方式,阻塞和非阻塞,阻塞意味着我们此刻必须对sockfd进行等待,就是说我们不能去干别的事,这显然不可以。因此,在以高并发为目标的服务器程序里,非阻塞是我们唯一的选择。
并且,et模式下,必须非阻塞,不然会产生套接字饿死的情况。
非阻塞模式下,我们还需要一样东西,就是缓冲区,因为你并不能保证你接受到的数据就是完整的。
这里使用的是多线程Reacter半同步半异步模式。
主线程负责监听以及接收新的连接,维护一个任务队列,其余线程从任务队列里获取任务并完成,同时也将新的任务添加进任务队列。
总体分为以下部分
程序主线程:监听fd绑定、监听,epoll监听
客户端和逻辑服务器的连接的封装
实现对连接的操作:
HandleRead()读, HandleWrite()写, Worker()数据处理, shutdown()连接关闭,getData()从用户缓冲区获取数据,puttData()将数据写入用户缓冲区
线程池的封装
任务队列的封装
实现队列的添加取出,以及同步加锁等处理
用户缓存区的封装
基本函数的封装:如 setNoBlocking(), addFd()…
工具类
//
// GataMain.cpp
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Util.h"
#include "ThreadPool.h"
#include "Connection.h"
#include "BaseFunc.h"
static const char *IP = "10.105.44.34";
// static const char *IP = "127.0.0.1";
// static const char *IP = "182.254.243.29";
static const int PORT = 11111;
//处理的最大连接数
static const int USER_PROCESS = 655536;
//epoll能监听的最大事件
static const int MAX_EVENT_NUMBER = 10000;
//信号通信的管道
static int sigPipefd[2];
//信号回调函数
static void sigHandler(int sig)
{
int saveErrno = errno;
send(sigPipefd[1], (char*)&sig, 1, 0);
errno = saveErrno;
}
//添加信号回调
static void addSig(int sig, void(handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = handler;
if(restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
if(-1 == sigaction(sig, &sa, NULL))
Util::outError("sigaction");
}
static int setupSigPipe()
{
//新建epoll监听表和事件管道
int epollfd = epoll_create(USER_PROCESS);
if(epollfd == -1)
Util::outError("epoll_create");
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sigPipefd);
assert(ret == 0);
//将写设置为非阻塞
setNoBlocking(sigPipefd[1]);
addFd(epollfd, sigPipefd[0], EPOLLIN | EPOLLET);
setNoBlocking(sigPipefd[0]);
//设置信号处理函数
addSig(SIGCHLD, sigHandler);
addSig(SIGTERM, sigHandler);
addSig(SIGINT, sigHandler);
addSig(SIGPIPE, sigHandler);
return epollfd;
}
int main()
{
int ret;
//构造协议地址结构
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = PF_INET;
inet_pton(PF_INET, IP, &address.sin_addr);
address.sin_port = htons(PORT);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert( listenfd >= 0 );
int opt = 1;
if(setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(int)) <0)
{
perror("setsockopt");
exit(1);
}
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
if(ret == -1)
{
perror("bind");
}
if(listen(listenfd, 1000) <0)
{
perror("listen");
exit(1);
}
Connection *users = new Connection[USER_PROCESS];
ThreadPool threadPool;
//统一事件源
int epollfd = setupSigPipe();
epoll_event events[MAX_EVENT_NUMBER];
// addFd(epollfd, listenfd, EPOLLIN | EPOLLET);
addFd(epollfd, listenfd, EPOLLIN);
// setNoBlocking(m_listenfd);
bool isRunning = true;
while(isRunning)
{
int num = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
//如果错误原因不是被中断,则循环退出
if((num <0) && (errno != EINTR))
{
Util::outError("epoll_wait failure");
break;
}
for(int i=0; i {
int sockfd = events[i].data.fd;
//处理新的请求
if(sockfd == listenfd)
{
//连接新的请求
struct sockaddr_in clientAddr;
socklen_t clientLen = sizeof(clientAddr);
int cOnnfd= accept(listenfd, (struct sockaddr*)&clientAddr, &clientLen);
if(connfd <0)
{
Util::outError("accept");
break;
}
Util::outMsg("accept a new client : %d %s\n", connfd, inet_ntoa(clientAddr.sin_addr));
addFd(epollfd, connfd, EPOLLIN | EPOLLET | EPOLLONESHOT);
setNoBlocking(connfd);
//初始化客户端链接
users[connfd].init(epollfd, connfd, clientAddr);
}
//处理信号
else if((sockfd == sigPipefd[0]) && (events[i].events & EPOLLIN))
{
char sigMsg[1024];
int ret = recv(sockfd, sigMsg, sizeof(sigMsg), 0);
if(ret <= 0)
{
continue;
}
for(int j=0; j {
//循环处理每个信号
switch(sigMsg[j])
{
case SIGCHLD:
{
break;
}
case SIGTERM:
case SIGINT:
{
//退出
Util::outMsg("程序退出\n");
isRunning = false;
break;
}
}
}
}
//处理读事件
else if(events[i].events & EPOLLIN)
{
//向任务队列添加读任务
threadPool.AddTask(std::bind(&Connection::HandleRead, users+sockfd));
}
//处理写事件
else if(events[i].events & EPOLLOUT)
{
// cout<<"hello"<
threadPool.AddTask(std::bind(&Connection::HandleWrite, users+sockfd));
}
}
}
delete[] users;
close(sigPipefd[0]);
close(sigPipefd[1]);
close(epollfd);
return 0;
}
//
// Connection.h
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//
#ifndef Connection_H
#define Connection_H
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "Buffer.h"
#include "Util.h"
#include "BaseFunc.h"
#include "json/json.h"
const std::string serverIP[] = {
"127.0.0.1",
"182.254.243.29"
};
const size_t BUFFER_SIZE = 65535;
class Connection
{
public:
static std::vector serverConnVt;
static std::map<string, Connection*> clientConnMap;
static int serverIdx;
static Connection* getServerConn()
{
int size = serverConnVt.size();
if(size == 0)
return NULL;
serverIdx = (serverIdx+1)%size;
return serverConnVt[serverIdx];
}
Connection() : m_writeing(true), m_epollfd(-1), m_sockfd(-1)
{}
~Connection(){}
//初始化连接
void init(int epollfd, int sockfd, const sockaddr_in& clientAddr)
{
//初始化读写缓冲区
m_inBuff.init();
m_outBuff.init();
m_epollfd = epollfd;
m_sockfd = sockfd;
m_writeing = true;
m_address = clientAddr;
m_username = "";
m_type = -1;
std::string sip(inet_ntoa(clientAddr.sin_addr));
for(auto& ip : serverIP)
{
if(ip.compare(0, sip.size(), sip) == 0)
{
m_type = 1;
serverConnVt.push_back(this);
cout<"是服务端"
<//
// Buffer.h
// QuoridorServer
//
// Created by shiyi on 2016/12/2.
// Copyright © 2016年 shiyi. All rights reserved.
//
#ifndef Buffer_H
#define Buffer_H
#include
#include
#include
using namespace std;
class Buffer
{
public:
Buffer() : m_widx(0), m_ridx(0)
{}
~Buffer(){}
void init()
{
m_widx = m_ridx = 0;
m_buf.clear();
}
//增加内容
void PutData(char *data, int len)
{
//如果调整空间后足够存放,则进行调整
int capa = m_buf.capacity();
if(capa len + m_widx - m_ridx)
adjust();
for(int i = 0; i m_buf.push_back(data[i]);
m_widx += len;
}
//返回获取的包的大小,数据不完整返回-1
int GetData(char* data)
{
if(m_widx - m_ridx <4)
return -1;
int len;
char *t = (char*)&len;
for(int i=0; i<4; i++)
{
t[i] = m_buf[m_ridx+i];
}
//printf("-=-=%d\n", len);
if(len+4 > m_widx-m_ridx)
return -1;
m_ridx += 4;
for(int i = 0; i {
data[i] = m_buf[m_ridx++];
}
if(m_ridx >= m_widx)
{
m_ridx = m_widx = 0;
m_buf.clear();
}
return len;
}
//返回Buffer内全部内容
int GetDataAll(char* data)
{
int len = m_widx-m_ridx;
for(int i = 0; i {
if(m_ridx >= m_widx)
break;
data[i] = m_buf[m_ridx++];
}
if(m_ridx >= m_widx)
{
m_ridx = m_widx = 0;
m_buf.clear();
}
return len;
}
private:
//将数据移至容器头部,充分利用空间
void adjust()
{
vector<char> t(m_buf.begin()+m_ridx, m_buf.begin()+m_widx);
m_widx -= m_ridx;
m_ridx = 0;
m_buf.clear();
for(int i=0; i m_buf.push_back(t[i]);
}
private:
int m_ridx;
int m_widx;
std::vector<char> m_buf;
};
#endif /* Buffer_H */
//
// ThreadPool.h
// QuoridorServer
//
// Created by shiyi on 2016/11/30.
// Copyright © 2016年 shiyi. All rights reserved.
//
#ifndef ThreadPool_H
#define ThreadPool_H
#include
#include
#include
#include
#include
#include "SyncQueue.h"
const int MaxTaskCount = 100;
class ThreadPool
{
public:
using Task = std::function<void()>;
ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
{
if(numThreads <4)
numThreads = 4;
printf("线程池启动-%d线程\n", numThreads);
Start(numThreads);
}
~ThreadPool()
{
Stop();
}
void Stop()
{
std::call_once(m_flag, [this]{
StopThreadGroup();
});
}
void AddTask(Task&& task)
{
m_queue.Push(std::forward(task));
}
void AddTask(const Task& task)
{
m_queue.Push(task);
}
private:
void Start(int numThreads)
{
m_running = true;
//创建线程组
for(int i=0; i {
m_threadGroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
}
void RunInThread()
{
while(m_running)
{
std::queue queue;
m_queue.Take(queue);
std::queue<int> a;
while(!queue.empty())
{
if(!m_running)
return;
auto task = queue.front();
queue.pop();
task();
}
}
}
void StopThreadGroup()
{
m_queue.Stop();
m_running = false;
for(auto thread : m_threadGroup)
{
thread->join();
}
m_threadGroup.clear();
}
private:
SyncQueue m_queue; //同步队列
std::vector<std::shared_ptr<std::thread>> m_threadGroup; //处理任务的线程组
atomic_bool m_running; //是否停止
std::once_flag m_flag;
};
#endif /* ThreadPool_H */
//
// SyncQueue.h
// QuoridorServer
//
// Created by shiyi on 2016/11/30.
// Copyright © 2016年 shiyi. All rights reserved.
//
#ifndef SyncQueue_H
#define SyncQueue_H
#include
#include
#include
#include
#include
#include
using namespace std;
template <typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) : m_maxSize(maxSize), m_isStop(false)
{
}
~SyncQueue(){}
void Push(const T& x)
{
Add(x);
}
void Push(T&& x)
{
Add(x);
}
void Take(T& t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty(locker, [this]{
return m_isStop || m_notEmpty();
});
if(m_isStop)
return;
t = m_queue.front();
m_queue.pop();
m_notFull.notify_one();
}
void Take(std::queue & queue)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]{
return m_isStop || NotEmpty();
});
if(m_isStop)
return;
queue = std::move(m_queue);
m_notFull.notify_one();
}
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_isStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() >= m_maxSize;
}
private:
bool NotFull()
{
bool full = m_queue.size() >= m_maxSize;
if(full)
cout<<"缓冲区满,需要等待..."< return !full;
}
bool NotEmpty()
{
bool empty = m_queue.empty();
if(empty)
cout<<"缓冲区空,需要等待..."< return !empty;
}
template<typename F>
void Add(F&& x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this]{
return m_isStop || NotFull();
});
if(m_isStop)
return;
m_queue.push(std::forward(x));
m_notEmpty.notify_one();
}
private:
bool m_isStop; //是否停止
int m_maxSize; //同步队列最大的长度
std::queue m_queue; //缓冲区
std::mutex m_mutex; //互斥量
std::condition_variable m_notEmpty; //不为空的条件变量
std::condition_variable m_notFull; //不满的条件变量
};
#endif /* SyncQueue_H */