Lab1-4 is to complete a stream reassembler, TCP receiving end, TCP sending end, TCP connection four parts, the four parts combined together is a complete TCP end. After the packaging can be carried out TCP receive and send.

The code is all on Github.

Lab1 stream reassembler

This experiment is to implement a stream reassembler, passing in a piece of data and its starting position, then reorganizing it, and output the reassembled data as soon as possible.

Here I’m using a red-black tree, which is STD ::set in C++. The uncompleted fragments are saved in the red-black tree. When the new fragments arrive, they are combined with the existing fragments as much as possible to ensure that there are no overlapping fragments in the red-black tree.

When lower_bound() is used, check whether the lower fragments can be joined or not. If lower_bound() is used, check whether the lower fragments can be joined. And a new fragment may cover many pieces at once. This part of the code I wrote was a bit confusing, because after I wrote it, the test found that something was not taken into account, and then I had to patch it, so it became more and more confusing.

The condition of output as soon as possible is very easy. If the current arrived fragment can be output directly, we can judge whether the first fragment in the tree can be output. Because there is no overlapping fragment before, we can judge only the first fragment.

Lab2 TCP receiver

This experiment is based on the last stream reassembler to achieve a TCP receiver, this one is relatively simple. Some of the previous stream reassembler bugs may be detected in this experiment, so go back and change the code.

The first is to implement a WrappingInt32, because TCP ordinals are 32-bit and overflow is possible, and the sequence number used in the stream reassembler is 64-bit, so you need to implement a function to convert the 64-bit relative sequence number to a 32-bit absolute sequence number based on the ISN.

#include "wrapping_integers.hh"

using namespace std;

WrappingInt32 wrap(uint64_t n, WrappingInt32 isn) {
    uint64_t res = isn.raw_value() + n;
    return WrappingInt32{static_cast<uint32_t>(res)};
}

uint64_t abs(uint64_t a, uint64_t b) {
    if (a > b) {
        return a - b;
    } else {
        returnb - a; }}uint64_t unwrap(WrappingInt32 n, WrappingInt32 isn, uint64_t checkpoint) {
    uint64_t pre = checkpoint & 0xffffffff00000000;
    uint64_t num;
    if (n.raw_value() >= isn.raw_value()) {
        num = n.raw_value() - isn.raw_value(a); }else {
        num = 0x0000000100000000;
        num += n.raw_value(a); num -= isn.raw_value(a); }uint64_t a = pre + num;
    uint64_t b = a + 0x0000000100000000;
    uint64_t c = a - 0x0000000100000000;
    // b a c
    if (abs(a, checkpoint) < abs(b, checkpoint)) {
        if (abs(a, checkpoint) < abs(c, checkpoint)) {
            return a;
        } else {
            returnc; }}else {
        returnb; }}Copy the code

Finally, we wrap the stream reassemper, calculate ackNO and WINDOW_size for later use, and handle the SYN and FIN flags.

#include "tcp_receiver.hh"

using namespace std;

void TCPReceiver::segment_received(const TCPSegment &seg) {
    if(! _isn.has_value()) {
        if (seg.header().syn) {
            _isn = seg.header().seqno + 1;
        } else {
            std::cerr << "Error: connection not build" << std::endl;
            return; }}bool eof = seg.header().fin;
    std::string&& payload = std::string(seg.payload().str());
    if (seg.header().seqno == _isn.value() - 1 && !seg.header().syn && _reassembler.expect()"0x0000ffff) {
        // wrong packet seqno == isn
        return;
    }
    uint64_t index = unwrap(seg.header().seqno + (seg.header().syn ? 1 : 0), _isn.value(), _reassembler.expect());
    _reassembler.push_substring(payload, index, eof);
}

optional<WrappingInt32> TCPReceiver::ackno(a) const {
    if (_isn.has_value()) {
        return { wrap(_reassembler.expect(), _isn.value()) + (_reassembler.stream_out().input_ended()?1 : 0)}; }else {
        returnstd::nullopt; }}size_t TCPReceiver::window_size(a) const {
    return _capacity - _reassembler.stream_out().buffer_size(a); }Copy the code

Lab3 TCP sender

In this experiment, we will consider some details of TCP, including the sending of SYN and FIN packets, ACK processing, the implementation of timeout retransmission.

The SYN packet

Earlier, I was wondering if the SYN packets on the client and server side should be different and how to handle them; In fact, the two packets are the same. Both packets carry the SYN and the initial sequence number. The difference is that the server’s SYN simultaneously ACK the client’s SYN packet. However, ACK is handled in the TCP connection, which means that the TCP sender sends a SYN packet and the rest is not needed.

When should I send a SYN packet? In the beginning, I construct a SYN packet in the constructor and put it in the send queue. This was fine for this experiment, but not for the next experiment. Because the server is initially in LISTEN state, no packets should be sent in this state. Therefore, the SYN packet should be sent in the fill_window() function, or first if no SYN packet has been sent.

For a simple SYN packet, set the SYN position to 1 and set the initial sequence number to SEqno. Note that the SYN takes a sequence number.

FIN package

When the sending stream is terminated by the user program, a FIN packet can be sent to close a connection in one direction. Sending this packet is relatively simple, just check whether it is finished in fill_window. FIN packets can be sent together with data packets. If the stream ends, set the FIN flag of the packet to 1. FIN also takes a serial number. After receiving an ACK for the FIN packet, the packet is successfully closed.

The processing of ACK

ACK data is notified to the sender through the ACK_received function. After receiving an ACK, you can slide the sending window to the right to check whether the ACK is the previous ACK to avoid the window moving to the left. Finally, remove all ACK packets from the queue waiting for confirmation.

The retransmission

In this experiment, only timeout retransmission mechanism was required, but I also added fast retransmission mechanism. In theory, when a packet times out, it must be retransmitted, that is, each packet must have a timer to be responsible for retransmission, and this cost is very high. Therefore, in the implementation, a timer is set for each TCP connection. When the time exceeds RTO, retransmission is performed. The timer rules are as follows:

  • When sending a packet and the timer is off: Turn on the timer
  • When a packet is sent and the timer is on: no changes are made
  • When an ACK is received and all packets are ACK: Turn off the timer
  • When an ACK is received and there are still unack packets: Restart the timer

Timeout retransmission uses the exponential backout algorithm. After a timeout retransmission, the next timeout time is doubled, i.e. 1RTO, 2RTO, 4RTO, 8RTO…

When the timer expires, packets need to be retransmitted. In this experiment, only the first packet needs to be retransmitted. The disadvantage of this method is that the lost packets can only be resent after the first ACK, which will take a long time. The alternative is to retransmit all packets, which has the disadvantage of overloading the network. Therefore, to solve this problem, other mechanisms have been introduced.

The fast retransmission

Fast retransmission means that the system immediately retransmits three duplicate ACKS (not including the first ACK). This method is data-driven rather than time-driven, avoiding the problem of slow timeout retransmission. And that still leaves the question of whether to retransmit one or all.

SACK

SACK is the selective retransmission mechanism. The receiver confirms the received fragments through SACK, so as to optimize the retransmission algorithm without retransmitting all packets.

However, SACK has the problem of receiver Reneging, that is, the receiver has the right to discard the SACK data. Such abandonment is discouraged but possible. Therefore, instead of relying entirely on SACK, the sender still relies on ACK and maintains the timer. If subsequent ACKS do not grow, the data that has been SACK is still retransmitted. Also, the receiver can never mark the SACK packet as an ACK.

#include "tcp_sender.hh"
#include "tcp_config.hh"

#include <random>
#include <cassert>

using namespace std;

TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
    : _isn(fixed_isn.value_or(WrappingInt32{random_device()()}))
    , _initial_retransmission_timeout{retx_timeout}
    , _stream(capacity) { }

uint64_t TCPSender::bytes_in_flight(a) const {
    return _next_seqno - _expect_ack;
}

void TCPSender::fill_window(a) {
    if(! _syn_sent) { TCPSegment seg; seg.header().syn = true;
        seg.header().seqno = wrap(0, _isn);
        _segments_out.push(seg);
        _seg_not_ack.push(seg);
        _next_seqno = 1;
        _retrans_timer = _tick + _initial_retransmission_timeout;
        _syn_sent = true;
    }
    uint64_t remain = _window_size - bytes_in_flight(a);bool send = false;
    if(_expect_ack ! =0) {
        // SYN received
        while (remain > 0 && _stream.buffer_size(a) >0) {
            // send segment
            uint64_t send_bytes = min(remain, TCPConfig::MAX_PAYLOAD_SIZE);
            string payload = _stream.read(send_bytes);
            TCPSegment seg;
            seg.header().seqno = wrap(_next_seqno, _isn);
            seg.payload() = move(payload);
            _next_seqno += seg.length_in_sequence_space(a); remain = _window_size -bytes_in_flight(a);if (_stream.eof() && remain > 0 && !_fin_sent) {
                seg.header().fin = true;
                _next_seqno += 1;
                _fin_sent = true;
            }
            _segments_out.push(seg);
            _seg_not_ack.push(seg);
            send = true; }}if (_stream.eof() && remain > 0 && !_fin_sent) {
        // send FIN
        TCPSegment seg;
        seg.header().fin = true;
        seg.header().seqno = wrap(_next_seqno, _isn);
        _segments_out.push(seg);
        _seg_not_ack.push(seg);
        _next_seqno += 1;
        _fin_sent = true;
        send = true;
    }

    if (send && _retrans_timer == 0) {
        // open timer
        _retrans_timer = _tick + _initial_retransmission_timeout;
        _consecutive_retransmissions = 0;
        _rto_back_off = 0; }}void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
    _window_size = window_size;
    _do_back_off = 1;
    if (_window_size == 0) {
        _window_size = 1;
        _do_back_off = 0;
    }
    uint64_t ack = unwrap(ackno, _isn, _expect_ack);
    if (ack <= _next_seqno && ack > _expect_ack) {
        if (ack == _expect_ack) {
            _same_ack++;
        } else {
            _same_ack = 0;
        }
        _expect_ack = ack;
        if (bytes_in_flight() = =0) {
            // close timer
            _retrans_timer = 0;
            _consecutive_retransmissions = 0;
            _rto_back_off = 0;
        } else {
            // reopen timer
            _retrans_timer = _tick + _initial_retransmission_timeout;
            _consecutive_retransmissions = 0;
            _rto_back_off = 0; }}// remove all acked packets
    while(! _seg_not_ack.empty()) {
        TCPSegment seg = _seg_not_ack.front(a);if (seg.length_in_sequence_space() + unwrap(seg.header().seqno, _isn, _expect_ack) <= _expect_ack) {
            _seg_not_ack.pop(a); }else {
            break; }}// faster retransmit
    if (_same_ack == 3 && !_seg_not_ack.empty()) {
        // cout << "!! FASTER RETRANSMIT" << endl;
        _same_ack = 0;
        TCPSegment seg = _seg_not_ack.front(a); _segments_out.push(seg);
        _consecutive_retransmissions += 1; _rto_back_off += _do_back_off; _retrans_timer += _initial_retransmission_timeout << _rto_back_off; }}void TCPSender::tick(const size_t ms_since_last_tick) {
    _tick += ms_since_last_tick;
    
    if(! _seg_not_ack.empty() && _tick >= _retrans_timer) {
        // retransmit the first packet
        // cout << "retransmit" << endl;
        TCPSegment seg = _seg_not_ack.front(a); _segments_out.push(seg);
        _consecutive_retransmissions += 1; _rto_back_off += _do_back_off; _retrans_timer = _tick + (_initial_retransmission_timeout << _rto_back_off); }}unsigned int TCPSender::consecutive_retransmissions(a) const { return _consecutive_retransmissions; }

void TCPSender::send_empty_segment(a) {
    TCPSegment seg;
    seg.header().seqno = wrap(_next_seqno, _isn);
    _segments_out.push(seg);
}
Copy the code

Lab4 TCP connection

This experiment is to combine the previous sender and receiver to form a complete TCP peer.

The main work is to take out the sent data from the sending queue, and then put it into the sending queue. Send an ACK packet for confirmation. RST is processed; Handle connection closure and TIME_WAIT state.

When the connect function is called, fill_window is called to generate the SYN packet, which is then sent.

When receiving a packet, it will give the information to the sender and receiver for processing, and then ACK, when the sending queue has a packet directly attached ACK, if there is no need to generate an empty packet for ACK, note that when the received packet is only an ACK packet without any data, do not ACK. When the receiver receives all data and FIN packets, it closes the receiver’s input stream.

When the input stream of the connection is closed, the sender end_INPUT and fill_window can be called to generate and send the FIN packet.

Closing of connection

The CLOSURE of a TCP connection is divided into two types: active closure and passive closure.

The sender FIN enters FIN_WAIT_1 state and FIN_WAIT_2 state upon receiving an ACK. After receiving the FIN packet and ACK, the sender enters TIME_WAIT state. In TIME_WAIT state, wait 2MSL (60s in Linux) to release the connection.

The purpose of the TIME_WAIT state is to ensure that the last ACK packet is received. This mode is used only because no ACK is performed. If the ACK packet is not received by the peer party, the peer party resends the FIN packet. In this case, the peer party can ACK again. If you release without TIME_WAIT, the next connection that uses the port may receive a REtransmitted FIN packet from the previous connection, resulting in confusion.

When the peer closes first, the peer closes passively. When the peer receives a FIN and ACK, the peer enters CLOSE_WAIT. After the sending flow ends, the FIN enters the last-ACK state and closes the connection after receiving the ACK from the FIN. In the case of passive closure, TIME_WAIT is not required.

#include "tcp_connection.hh"

#include <iostream>

using namespace std;

void TCPConnection::send_all_segments(a) {
    if (_closed) return;
    while(! _sender.segments_out().empty()) {
        TCPSegment& seg = _sender.segments_out().front(a);if (_receiver.ackno().has_value()) {
            seg.header().ack = true;
            seg.header().ackno = _receiver.ackno().value(a); }size_t max_win = numeric_limits<uint16_t> ().max(a); seg.header().win = min(_receiver.window_size(), max_win);
        _segments_out.push(seg);
        _sender.segments_out().pop(a); }if (_sender.stream_in().eof() && _sender.bytes_in_flight() = =0 && _receiver.stream_out().input_ended()) {
        if (_linger_after_streams_finish) {
            _time_wait = true; }}}size_t TCPConnection::remaining_outbound_capacity(a) const {
    return _sender.stream_in().remaining_capacity(a); }size_t TCPConnection::bytes_in_flight(a) const {
    return _sender.bytes_in_flight(a); }size_t TCPConnection::unassembled_bytes(a) const {
    return _receiver.unassembled_bytes(a); }size_t TCPConnection::time_since_last_segment_received(a) const {
    return _ticks - _last_ack_time;
}

void TCPConnection::segment_received(const TCPSegment &seg) {
    if(! _syn_sent && ! seg.header().syn) return;
    if (seg.header().rst) {
        // reset connection
        _sender.stream_in().set_error(a); _receiver.stream_out().set_error(a); _linger_after_streams_finish =false;
    }

    _last_ack_time = _ticks;
    _receiver.segment_received(seg);
    _sender.ack_received(seg.header().ackno, seg.header().win);
    _sender.fill_window(a); _syn_sent =true;

    if (_receiver.stream_out().input_ended() && !_sender.stream_in().eof()) {
        // passive close
        _linger_after_streams_finish = false;
    }

    if(! _receiver.ackno().has_value()) {
        return; // no need for ack
    }
    if (_sender.segments_out().empty()) {
        // generate an empty segment to ack
        if (_receiver.stream_out().input_ended() && !seg.header().fin) {
            // no need to ack, server closed and seg not fin
        } else if (seg.length_in_sequence_space() = =0) {
            // no need to ack the empty-ack
        } else {
            _sender.send_empty_segment();
        }
    }
    // send with ack
    send_all_segments(a); }bool TCPConnection::active(a) const {
    if (_sender.stream_in().error() && _receiver.stream_out().error()) return false;
    return! (_sender.stream_in().eof() && _sender.bytes_in_flight() = =0 && _receiver.stream_out().input_ended()) || _time_wait;
}

size_t TCPConnection::write(const string &data) {
    size_t wrote = _sender.stream_in().write(data);
    _sender.fill_window(a);send_all_segments(a);return wrote;
}

void TCPConnection::tick(const size_t ms_since_last_tick) {
    _ticks += ms_since_last_tick;
    _sender.tick(ms_since_last_tick);

    if (_time_wait && _ticks >= _last_ack_time + _cfg.rt_timeout * 10) {
        // closed
        _time_wait = false;
        _closed = true;
    }

    if (_sender.consecutive_retransmissions() > _cfg.MAX_RETX_ATTEMPTS) {
        // RST
        _sender.stream_in().set_error(a); _receiver.stream_out().set_error(a); _linger_after_streams_finish =false;
        while(! _sender.segments_out().empty()) {
            // pop all segments
            _sender.segments_out().pop(a); } _sender.send_empty_segment(a); TCPSegment& seg = _sender.segments_out().front(a); seg.header().rst = true;
    }
    send_all_segments(a); }void TCPConnection::end_input_stream(a) {
    _sender.stream_in().end_input(a); _sender.fill_window(a);send_all_segments(a); }void TCPConnection::connect(a) {
    // send SYN
    if(! _syn_sent) { _sender.fill_window(a); _syn_sent =true;
        TCPSegment& seg = _sender.segments_out().front(a);size_t max_win = numeric_limits<uint16_t> ().max(a); seg.header().win = min(_receiver.window_size(), max_win);
        _segments_out.push(seg);
        _sender.segments_out().pop(a); } } TCPConnection::~TCPConnection() {
    try {
        if (active()) {
            cerr << "Warning: Unclean shutdown of TCPConnection\n";
            _sender.stream_in().set_error(a); _receiver.stream_out().set_error(a); _linger_after_streams_finish =false;
            while(! _sender.segments_out().empty()) {
                // pop all segments
                _sender.segments_out().pop(a); } _sender.send_empty_segment(a); TCPSegment& seg = _sender.segments_out().front(a); seg.header().rst = true;
            send_all_segments();
        }
    } catch (const exception &e) {
        std::cerr << "Exception destructing TCP FSM: " << e.what() << std::endl; }}Copy the code

test

At this point, the whole simple TCP protocol is finished, using this TCP protocol to modify the WebGET in Lab0, and then you can visit the website. Using the packet capture software, you can see the complete process of establishing a connection, sending data, and closing the connection. However, there is a problem here, I don’t know why. Cs144.keithw.org and bilibili.com can be accessed normally by using webget, but when accessing www.baidu.com, a package after connection establishment will be lost. TCP Previous segment not captured. I don’t know if it’s my code or some special strategy baidu’s servers are using.