Abbreviated Muduo Library (8): tcpconnection

Look, the future 2021-09-15 09:37:34

lately VScode broken , Somehow I can't connect to the virtual machine , Very afflictive .
It has been decided not to Linux The problem of , Because with cmd You can connect remotely .
So this one uses VS Hold on first , I can't see clearly when I report a lot of mistakes .


CallBack.hpp

Store some callback declarations .

#pragma once
#include <functional>
class Buffer;
class TcpConnection;
class timestamp;
using TcpConnectionptr = std::shared_ptr<TcpConnection>;
using ConnectionCallback = std::function<void (const TcpConnectionptr&)>;
using CloseCallback = std::function<void (const TcpConnectionptr&)>;
using WriteCompleteCallback = std::function<void (const TcpConnectionptr&)>;
using MessageCallback = std::function<void (const TcpConnectionptr&,Buffer*,timestamp)>;
using ThreadInitCallback = std::function<void(EventLoop*)>;

TcpConnection.hpp

#pragma once
#include "nocopyable.hpp"
#include "InetAddr.hpp"
#include "callback.hpp"
#include "buffer.hpp"
#include "timestamp.hpp"
#include <memory>
#include <string>
#include <atomic>
class Channel;
class EventLoop;
class Socket;
//TcpServer adopt Acceptor, When there is a new accept Function get connfd,TCPConnection Set callback , turn channel, Into the poller, call channel Callback operation of 
class TcpConnection :public nocpoyable, public std::enable_shared_from_this<TcpConnection>
{
 // Get the smart pointer of the current object 
public:
using TcpConnectionPtr = std::shared_ptr<TcpConnection>;
TcpConnection(EventLoop* loop,
const std::string& name,
int sockfd,
const InetAddress& localaddr,
const InetAddress& peeraddr);
~TcpConnection();
EventLoop* getLoop() const {
 return loop_; }
const std::string& name() const {
 return name_; }
const InetAddress& localAddress() const {
 return localaddr_; }
const InetAddress& peerAddress() const {
 return peeraddr_; }
bool connected() const {
 return state_ == kConnected; }
void send(const std::string &buf);
void shutdown();
void setConnectionCallback(const ConnectionCallback& cb)
{

connectionCallback_ = cb;
}
void setMessageCallback(const MessageCallback& cb)
{

messageCallback_ = cb;
}
void setWriteCompleteCallback(const WriteCompleteCallback& cb)
{

writeCompleteCallback_ = cb;
}
void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
{

highWaterMarkCallback_ = cb;
highWaterMark_ = highWaterMark;
}
void connectEstablished();
void connectDestroyed();
private:
enum State
{

kDisconnected,
kConnecting,
kConnected,
kDisconnecting
};
void setState(State s) {
 state_ = s; }
void handleRead(timestamp receiveTime);
void handleWrite();
void handleClose();
void handleError();
void sendInLoop(const void* message, size_t len);
void shutdownInLoop();
EventLoop* loop_; // Exist in subloop in 
const std::string name_;
std::atomic_int state_;
bool reading_;
std::unique_ptr<Socket> socket_;
std::unique_ptr<Channel> channel_;
const InetAddress localaddr_;
const InetAddress peeraddr_;
Buffer inputBuffer_;
Buffer outputBuffer_;
size_t highWaterMark_;
ConnectionCallback connectionCallback_;
MessageCallback messageCallback_;
WriteCompleteCallback writeCompleteCallback_;
ThreadInitCallback threadInitCallback_;
HighWaterMarkCallback highWaterMarkCallback_;
CloseCallback closeCallback_;
};

TcpConnection.cc

#include "logger.hpp"
#include "tcpconnection.hpp"
#include "eventloop.hpp"
#include "channel.hpp"
#include "socket.hpp"
#include "eventloop.hpp"
#include <functional>
#include <errno.h>
static EventLoop* CheckLoopNotNull(EventLoop* loop)
{

if (loop == nullptr)
{

LOG_FATAL("%s:%s:%d mainloop is null!\n", __FILE__, __FUNCTION__, __LINE__);
}
return loop;
}
TcpConnection::TcpConnection(EventLoop* loop,
const std::string& nameArg,
int sockfd,
const InetAddress& localaddr,
const InetAddress& peeraddr)
: loop_(CheckLoopNotNull(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localaddr_(localaddr),
peeraddr_(peeraddr),
highWaterMark_(64 * 1024 * 1024)
{

channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, std::placeholders::_1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG("TcpConnection::ctor[%s] at %p fd= %d \n", name_, this, sockfd);
//socket_->setKeepAlive(true);
}
TcpConnection::~TcpConnection()
{
}
void TcpConnection::handleRead(timestamp receiveTime)
{

int saveerrno = 0;
ssize_t n = inputBuffer_.readFD(channel_->fd(), &saveerrno);
if (n > 0)
{

messageCallback_(std::shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{

errno = saveerrno;
LOG_ERROR("TcpConnection::handleRead\n");
handleError();
}
}
void TcpConnection::handleWrite()
{

if (channel_->isWriting())
{

int saveErrno = 0;
ssize_t n = outputBuffer_.writeFD(channel_->fd(), &saveErrno);
if (n > 0)
{

outputBuffer_.retrieve(n);
if (outputBuffer_.readablebuffer() == 0)
{

channel_->disableWriting();
if (writeCompleteCallback_)
{

loop_->queueInLoop(std::bind(writeCompleteCallback_, std::shared_from_this()));
}
if (state_ == kDisconnecting)
{

shutdownInLoop();
}
}
}
else
{

LOG_ERROR("TcpConnection::handleRead\n");
}
}
else
{

LOG_ERROR("Connection fd = %d is done\n", channel_->fd());
}
}
void TcpConnection::handleClose()
{

setState(kDisconnected);
channel_->disableAll();
TcpConnectionPtr connptr(std::shared_from_this());
connectionCallback_(connptr);
closeCallback_(connptr);
}
void TcpConnection::handleError()
{

int optval;
socklen_t optlen = sizeof optval;
int err;
if (::getsockopt(channel_->fd(), SOL_SOCKET, optval, &optlen) < 0) {

err = errno;
}
else {

err = 0;
}
LOG_ERROR("TcpConnection::handleError [%s] - SO_ERROR = %d\n", name_.c_str(), err);
}
void TcpConnection::send(const std::string &buf) {

if (state_ == kConnected) {

if(loop_->isInLoopThread()) {

sendInLoop(buf.c_str(),buf.size());
}
else {

loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size()));
}
}
}
// Applications write fast , The kernel writes slowly , So you need a buffer , And set the water level , Prevent writing too fast 
void TcpConnection::sendInLoop(const void* message, size_t len)
{

ssize_t wrote = 0;
ssize_t remaining = len;
bool faulterr = false;
if (state_ == kDisconnected) {

LOG_ERROR("disconnected,give up writing! \n");
return;
}
//channel Start writing data for the first time , And there is no data to be sent in the buffer 
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {

wrote = ::write(channel_->fd(), message, len);
if (wrote >= 0) {

remaining = len - wrote;
// The data is sent at one time , No more right poller Set up epollout event 
if (remaining == 0 && WriteCompleteCallback_) {

loop_->queueInLoop(std::bind(WriteCompleteCallback_,shared_from_this()));
}
}
else {

wrote = 0;
if (errno != EWOULDBLOCK) {

LOG_ERROR("TcpConnect::sendInLoop\n");
if (errno == EPIPE || errno == ECONNRESET) {

faulterr = true;
}
}
}
}
// Explain this write Didn't send all the data , The remaining data needs to be saved to the buffer ,
// to channel register epollout event ,poller Will notify the appropriate sock->channel Call the corresponding callback method 
if (!faulterr && wrote > 0) {

size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{

loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data) + nwrote, remaining);
if (!channel_->isWriting())
{

channel_->enableWriting();
}
}
}
void TcpConnection::connectEstablished()
{

setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading();
connectionCallback_(shared_from_this());
}
void TcpConnection::connectDestroyed()
{

if (state_ == kConnected) {

setState(kDisconnected);
channel_->disableAll();
ConnectionCallback_(shared_from_this());
}
channel_->remove();
}
void TcpConnection::shutdown()
{

if (state_ == kConnected){

setState(kDisconnecting);
// FIXME: shared_from_this()?
loop_->runInLoop(std::bind(&TcpConnection::shutdownInLoop, this));
}
}
void TcpConnection::shutdownInLoop()
{

if (!channel_->isWriting())
{

// we are not writing
socket_->shutdownWrite();
}
}
Please bring the original link to reprint ,thank
Similar articles

2021-09-15

2021-09-15

2021-09-15

2021-09-15