sequence

This article mainly studies the exportRDB of ClaudB

exportRDB

Claudb 1.7.1 / SRC/main/Java/com/lot/tonivade/claudb/DBServerState Java

public class DBServerState { //...... public void exportRDB(OutputStream output) throws IOException { RDBOutputStream rdb = new RDBOutputStream(output); rdb.preamble(RDB_VERSION); for (int i = 0; i < databases.size(); i++) { Database db = databases.get(i); if (! db.isEmpty()) { rdb.select(i); rdb.dabatase(db); } } rdb.end(); } / /... }Copy the code
  • ExportRDB method first through rdb.preamble(RDB_VERSION) write redis magic number and version; Select (I) write the length of select and DB one by one. Then execute rdb.dabatase(DB), traverse entry, write data one by one according to expiredAt, type, key and value. The end method writes END_OF_STREAM, followed by checksum

RDBOutputStream

Claudb 1.7.1 / SRC/main/Java/com/lot/tonivade/claudb/personal/RDBOutputStream. Java

public class RDBOutputStream {

  private static final byte[] REDIS = safeString("REDIS").getBytes();

  private static final int TTL_MILLISECONDS = 0xFC;
  private static final int END_OF_STREAM = 0xFF;
  private static final int SELECT = 0xFE;

  private final CheckedOutputStream out;

  public RDBOutputStream(OutputStream out) {
    super();
    this.out = new CheckedOutputStream(out, new CRC64());
  }

  public void preamble(int version) throws IOException {
    out.write(REDIS);
    out.write(version(version));
  }

  private byte[] version(int version) {
    StringBuilder sb = new StringBuilder(String.valueOf(version));
    for (int i = sb.length(); i < Integer.BYTES; i++) {
      sb.insert(0, '0');
    }
    return sb.toString().getBytes(StandardCharsets.UTF_8);
  }

  public void select(int db) throws IOException {
    out.write(SELECT);
    length(db);
  }

  public void dabatase(Database db) throws IOException {
    for (Tuple2<DatabaseKey, DatabaseValue> entry : db.entrySet()) {
      value(entry.get1(), entry.get2());
    }
  }

  private void value(DatabaseKey key, DatabaseValue value) throws IOException {
    expiredAt(value.getExpiredAt());
    type(value.getType());
    key(key);
    value(value);
  }

  private void expiredAt(Instant expiredAt) throws IOException {
    if (expiredAt != null) {
      out.write(TTL_MILLISECONDS);
      out.write(ByteUtils.toByteArray(expiredAt.toEpochMilli()));
    }
  }

  private void type(DataType type) throws IOException {
    out.write(type.ordinal());
  }

  private void key(DatabaseKey key) throws IOException {
    string(key.getValue());
  }

  private void value(DatabaseValue value) throws IOException {
    switch (value.getType()) {
    case STRING:
      string(value.getString());
      break;
    case LIST:
      list(value.getList());
      break;
    case HASH:
      hash(value.getHash());
      break;
    case SET:
      set(value.getSet());
      break;
    case ZSET:
      zset(value.getSortedSet());
      break;
    default:
      break;
    }
  }

  private void length(int length) throws IOException {
    if (length < 0x40) {
      // 1 byte: 00XXXXXX
      out.write(length);
    } else if (length < 0x4000) {
      // 2 bytes: 01XXXXXX XXXXXXXX
      int b1 = length >> 8;
      int b2 = length & 0xFF;
      out.write(0x40 | b1);
      out.write(b2);
    } else {
      // 5 bytes: 10...... XXXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
      out.write(0x80);
      out.write(toByteArray(length));
    }
  }

  private void string(String value) throws IOException {
    string(safeString(value));
  }

  private void string(SafeString value) throws IOException {
    byte[] bytes = value.getBytes();
    length(bytes.length);
    out.write(bytes);
  }

  private void string(double value) throws IOException {
    string(String.valueOf(value));
  }

  private void list(ImmutableList<SafeString> value) throws IOException {
    length(value.size());
    for (SafeString item : value) {
      string(item);
    }
  }

  private void hash(ImmutableMap<SafeString, SafeString> value) throws IOException {
    length(value.size());
    for (Tuple2<SafeString, SafeString> entry : value.entries()) {
      string(entry.get1());
      string(entry.get2());
    }
  }

  private void set(ImmutableSet<SafeString> value) throws IOException {
    length(value.size());
    for (SafeString item : value) {
      string(item);
    }
  }

  private void zset(NavigableSet<Entry<Double, SafeString>> value) throws IOException {
    length(value.size());
    for (Entry<Double, SafeString> item : value) {
      string(item.getValue());
      string(item.getKey());
    }
  }

  public void end() throws IOException {
    out.write(END_OF_STREAM);
    out.write(toByteArray(out.getChecksum().getValue()));
    out.flush();
  }
}
Copy the code
  • The constructor of RDBOutputStream wraps OutputStream with CheckedOutputStream; Its dabatase method iterates through db.entryset (), executing value methods one by one; The value method executes expiredAt, type, key and value methods respectively. The value method does different processing for STRING, LIST, HASH, SET, and ZSET values. The string method just says string; The list method writes the size of the list first and then the elements of the list one by one. The hash method writes the hash size first, then the key and value one by one. Set writes the set size first, then the set elements one by one. Zset first writes zset size, then value and score; The end method writes END_OF_STREAM, followed by checksum

CheckedOutputStream

java.base/java/util/zip/CheckedOutputStream.java

public
class CheckedOutputStream extends FilterOutputStream {
    private Checksum cksum;

    /**
     * Creates an output stream with the specified Checksum.
     * @param out the output stream
     * @param cksum the checksum
     */
    public CheckedOutputStream(OutputStream out, Checksum cksum) {
        super(out);
        this.cksum = cksum;
    }

    /**
     * Writes a byte. Will block until the byte is actually written.
     * @param b the byte to be written
     * @exception IOException if an I/O error has occurred
     */
    public void write(int b) throws IOException {
        out.write(b);
        cksum.update(b);
    }

    /**
     * Writes an array of bytes. Will block until the bytes are
     * actually written.
     * @param b the data to be written
     * @param off the start offset of the data
     * @param len the number of bytes to be written
     * @exception IOException if an I/O error has occurred
     */
    public void write(byte[] b, int off, int len) throws IOException {
        out.write(b, off, len);
        cksum.update(b, off, len);
    }

    /**
     * Returns the Checksum for this output stream.
     * @return the Checksum
     */
    public Checksum getChecksum() {
        return cksum;
    }
}
Copy the code
  • CheckedOutputStream inherits FilterOutputStream. Its constructor requires OutputStream and Checksum, and cksum.update is executed each time it is written. Its getChecksum method returns ckSUM directly

CRC64

public class CRC64 implements Checksum { private static final int LOOKUPTABLE_SIZE = 256; private static final long POLY64REV = 0xC96C5795D7870F42L; private static final long LOOKUPTABLE[] = new long[LOOKUPTABLE_SIZE]; private long crc = -1; static { for (int b = 0; b < LOOKUPTABLE.length; ++b) { long r = b; for (int i = 0; i < Long.BYTES; ++i) { if ((r & 1) == 1) { r = (r >>> 1) ^ POLY64REV; } else { r >>>= 1; } } LOOKUPTABLE[b] = r; } } @Override public void update(int b) { crc = LOOKUPTABLE[((b & 0xFF) ^ (int) crc) & 0xFF] ^ (crc >>> 8); } @Override public void update(byte[] buf, int off, int len) { int end = off + len; while (off < end) { crc = LOOKUPTABLE[(buf[off++] ^ (int) crc) & 0xFF] ^ (crc >>> 8); } } @Override public long getValue() { return ~crc; } @Override public void reset() { crc = -1; }}Copy the code
  • CRC64 implements the Checksum interface, and its update method borrows LOOKUPTABLE to update CRC

summary

ExportRDB method first through rdb.preamble(RDB_VERSION) write redis magic number and version; Select (I) write the length of select and DB one by one. Then execute rdb.dabatase(DB), traverse entry, write data one by one according to expiredAt, type, key and value. The end method writes END_OF_STREAM, followed by checksum

doc

  • DBServerState