sequence

This paper mainly studies CRDT

CRDT

CRDT is short for conflict-free Replicated Data Type, also known as a passive synchronisation, that is, a conflict-free Replicated Data Type that can be used to replicate Data across networks and automatically resolve conflicts to achieve consistency. It is suitable for copying data between partitions in systems using AP architecture. Concrete implementation can be divided into state-based CvRDT, operation-based CmRDT, delta-based, Pure operation-based and so on

Consistency with Consensus, guarantee convergence to the same value in spite of network delays, partitions and message reordering

State-based(CvRDT)

  • CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo
  • This is done by passing the states of the entire object. You need to define a merge function to merge the input Object States
  • The merge function needs to satisfy COMMUTative and idempotent (1600mv) to be retried and order independent

Operation-based(CmRDT)

  • CmRDT, short for Commutative Replicated Data Type, is commonly used in Cooperative Systems such as Bayou, Rover, IceCube, and Telex
  • This is done by passing operations, which requires the prepare method to generate operations and the effect method to apply the input operations changes to the local state
  • Here, the transmission protocol is required to be reliable. If repeated transmission is possible, Effect is required to be idempotent, and there are certain requirements for Order. If order cannot be guaranteed, effect is required to be the effect of OR when combined

Delta-based

Delta-based can be understood as a combination of state-based and operation-based improvements. It implements replication through delta-based mutators

Pure operation-based

In the operation-based mode, the prepare method is required to generate operations, which may be delayed. Pure operation-based means that the prepare method is not used to generate operations by comparing state. Instead, it simply returns the existing Operations, which need to record the Operations for each step of the Object State operation

Convergent Operations

For CRDT, to implement some operations on the data structure in conflict-free Replicated, the following conditions need to be met:

  • Associative

(A +(b+ C)=(a+b)+ C), i.e., grouping does not affect

  • Commutative

(a+b=b+a), order has no effect

  • Idempotent

(a+a=a), duplication has no effect (idempotent)

Basic data types

The basic data types of CRDT include Counters, Registers, and Sets

Counters

  • Grow-only counter(G-Counter)

Use the Max function to merge

  • Positive-negative counter(PN-Counter)

We use two G-counters, one for increment and one for decrement, and sum at the end

Registers

Register has both assign() and value() operations

  • Last Write Wins -register(LWW-Register)

Add unique ids to each assign operation, such as timestamps or vector clocks, and merge using the Max function

  • Multi-valued -register(MV-Register)

Similar to G-counter, a new version of assign is added each time, using the Max function to merge

Sets

  • Grow-only set(G-Set)

Merge using the union operation

  • Two-phase set(2P-Set)

Use two G-sets, an addSet for adding and a removeSet for removing

  • Last write wins set(LWW-element Set)

Similar to 2P-set, there is an addSet and a removeSet, but timestamp information is added to the element, and the add and remove with higher timestamp take precedence

  • Observed-remove set(OR-Set)

Like 2P-set, there is an addSet, and a removeSet, but with tag information for the element, add takes precedence over remove for the same tag

Other data types

Array

There is an Array Replicated Growable Array(RGA) that supports the addRight(v, a) operation

Graph

Graph can be implemented based on Sets, but needs to handle concurrent addEdge(u, v), removeVertex(u) operations

Map

Map processes concurrent PUT and RMV operations

The instance

The wurmloch-CRDT implementation is used here

GCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GCounter.java

public class GCounter extends AbstractCrdt<GCounter, GCounter.UpdateCommand> {

    // fields
    private Map<String, Long> entries = HashMap.empty();


    // constructor
    public GCounter(String nodeId, String crdtId) {
        super(nodeId, crdtId, BehaviorProcessor.create());
    }


    // crdt
    @Override
    protected Option<UpdateCommand> processCommand(UpdateCommand command) {
        final Map<String, Long> oldEntries = entries;
        entries = entries.merge(command.entries, Math::max);
        return entries.equals(oldEntries)? Option.none() : Option.of(new UpdateCommand(crdtId, entries));
    }


    // core functionality
    public long get() {
        return entries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number."); } entries = entries.put(nodeId, entries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, entries )); } / /... }Copy the code
  • ProcessCommand receives UpdateCommand and merges it using the HashMap merge method, where BiFunction is Math:: Max; The get() method sums the entries.values() to get the result

PNCounter

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/PNCounter.java

public class PNCounter extends AbstractCrdt<PNCounter, PNCounter.UpdateCommand> {

    // fields
    private Map<String, Long> pEntries = HashMap.empty();
    private Map<String, Long> nEntries = HashMap.empty();


    // constructor
    public PNCounter(String nodeId, String crtdId) {
        super(nodeId, crtdId, BehaviorProcessor.create());
    }


    // crdt
    protected Option<UpdateCommand> processCommand(PNCounter.UpdateCommand command) {
        final Map<String, Long> oldPEntries = pEntries;
        final Map<String, Long> oldNEntries = nEntries;
        pEntries = pEntries.merge(command.pEntries, Math::max);
        nEntries = nEntries.merge(command.nEntries, Math::max);
        return pEntries.equals(oldPEntries) && nEntries.equals(oldNEntries)? Option.none()
                : Option.of(new UpdateCommand(crdtId, pEntries, nEntries));
    }


    // core functionality
    public long get() {
        return pEntries.values().sum().longValue() - nEntries.values().sum().longValue();
    }

    public void increment() {
        increment(1L);
    }

    public void increment(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number.");
        }
        pEntries = pEntries.put(nodeId, pEntries.get(nodeId).getOrElse(0L) + value);
        commands.onNext(new UpdateCommand(
                crdtId,
                pEntries,
                nEntries
        ));
    }

    public void decrement() {
        decrement(1L);
    }

    public void decrement(long value) {
        if (value < 1L) {
            throw new IllegalArgumentException("Value needs to be a positive number."); } nEntries = nEntries.put(nodeId, nEntries.get(nodeId).getOrElse(0L) + value); commands.onNext(new UpdateCommand( crdtId, pEntries, nEntries )); } / /... }Copy the code
  • PNCounter uses two hashmaps, pEntries for increments and nEntries for decrements. ProcessCommand uses HashMap merge to merge pEntries and nEntries respectively, where BiFunction is Math:: Max; The get() method uses the sum of pEntries. Values () minus the sum of nendies.values ()

LWWRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/LWWRegister.java

public class LWWRegister<T> extends AbstractCrdt<LWWRegister<T>, LWWRegister.SetCommand<T>> { // fields private T value;  private StrictVectorClock clock; // constructor public LWWRegister(String nodeId, String crdtId) { super(nodeId, crdtId, BehaviorProcessor.create()); this.clock = new StrictVectorClock(nodeId); } // crdt protected Option<SetCommand<T>> processCommand(SetCommand<T>command) {
        if (clock.compareTo(command.getClock()) < 0) {
            clock = clock.merge(command.getClock());
            doSet(command.getValue());
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public T get() {
        return value;
    }

    public void set(T newValue) {
        if (! Objects.equals(value, newValue)) {
            doSet(newValue);
            commands.onNext(new SetCommand<>(
                    crdtId,
                    value,
                    clock
            ));
        }
    }


    // implementation
    private void doSet(T value) { this.value = value; clock = clock.increment(); } / /... }Copy the code
  • LWWRegister uses StrictVectorClock, its processCommand receives SetCommand, which merges clock when the local clock is less than command-getclock (). Then execute doSet to update value and update local clock.increment()

MVRegister

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/MVRegister.java

public class MVRegister<T> extends AbstractCrdt<MVRegister<T>, MVRegister.SetCommand<T>> {

    // fields
    private Array<Entry<T>> entries = Array.empty();


    // constructor
    public MVRegister(String nodeId, String crdtId) {
        super(nodeId, crdtId, ReplayProcessor.create());
    }


    // crdt
    protected Option<SetCommand<T>> processCommand(SetCommand<T> command) {
        final Entry<T> newEntry = command.getEntry();
        if(! entries.exists(entry -> entry.getClock().compareTo(newEntry.getClock()) > 0 || entry.getClock().equals(newEntry.getClock()))) { final Array<Entry<T>> newEntries = entries .filter(entry -> entry.getClock().compareTo(newEntry.getClock()) == 0) .append(newEntry);doSet(newEntries);
            return Option.of(command);
        }
        return Option.none();
    }


    // core functionality
    public Array<T> get() {
        return entries.map(Entry::getValue);
    }

    public void set(T newValue) {
        if(entries.size() ! = 1 | |! Objects.equals(entries.head().getValue(), newValue)) { final Entry<T> newEntry = new Entry<>(newValue, incVV());doSet(Array.of(newEntry));
            commands.onNext(new SetCommand<>(
                    crdtId,
                    newEntry
            ));
        }
    }


    // implementation
    private void doSet(Array<Entry<T>> newEntries) {
        entries = newEntries;
    }

    private VectorClock incVV() {
        final Array<VectorClock> clocks = entries.map(Entry::getClock);
        final VectorClock mergedClock = clocks.reduceOption(VectorClock::merge).getOrElse(new VectorClock());
        returnmergedClock.increment(nodeId); } / /... }Copy the code
  • Where LWWRegister uses Array and StrictVectorClock, its processCommand receives SetCommand, It creates new newEntries when clocks without entries are greater than or equal newentry.getclock (). This newEntries does not contain entries equal to newentry.getclock (). NewEntry is added, and finally doSet is used to assign to the local entries

GSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/GSet.java

public class GSet<E> extends AbstractSet<E> implements Crdt<GSet<E>, GSet.AddCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Processor<AddCommand<E>, AddCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public GSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super AddCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends AddCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<AddCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);
        });
    }

    private Option<AddCommand<E>> processCommand(AddCommand<E> command) {
        return doAdd(command.getElement())? Option.of(command) : Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new GSetIterator();
    }

    @Override
    public boolean add(E element) {
        commands.onNext(new AddCommand<>(crdtId, element));
        return doAdd(element);
    }


    // implementation
    private synchronized boolean doAdd(E element) {
        returnelements.add(element); } / /... }Copy the code
  • Here GSet is implemented using Set, its processCommand receives AddCommand, and its doAdd method is merged using the add of Set

TwoPhaseSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/TwoPSet.java

public class TwoPSet<E> extends AbstractSet<E> implements Crdt<TwoPSet<E>, TwoPSet.TwoPSetCommand<E>> {

    // fields
    private final String crdtId;
    private final Set<E> elements = new HashSet<>();
    private final Set<E> tombstone = new HashSet<>();
    private final Processor<TwoPSetCommand<E>, TwoPSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public TwoPSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "CrdtId must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super TwoPSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends TwoPSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<TwoPSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<TwoPSetCommand<E>> processCommand(TwoPSetCommand<E> command) {
        if (command instanceof TwoPSet.AddCommand) {
            return doAdd(((TwoPSet.AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof TwoPSet.RemoveCommand) {
            return doRemove(((TwoPSet.RemoveCommand<E>) command).getElement())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return elements.size();
    }

    @Override
    public Iterator<E> iterator() {
        return new TwoPSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean changed = doAdd(value);
        if (changed) {
            commands.onNext(new TwoPSet.AddCommand<>(crdtId, value));
        }
        return changed;
    }


    // implementation
    private boolean doAdd(E value) {
        return! tombstone.contains(value) && elements.add(value); } private booleandoRemove(E value) {
        returntombstone.add(value) | elements.remove(value); } / /... }Copy the code
  • Here TwoPSet is implemented using two sets, elements for add and tombstone for remove; The processCommand method receives TwoPSetCommand. It has two subclasses TwoPSet.AddCommand and TwoPSet.RemoveCommand. DoAdd requires that tombstone not contain the element and add elements to elements; DoRemove adds elements to the tombstone and removes elements from elements

ORSet

wurmloch-crdt/src/main/java/com/netopyr/wurmloch/crdt/ORSet.java

public class ORSet<E> extends AbstractSet<E> implements Crdt<ORSet<E>, ORSet.ORSetCommand<E>> /*, ObservableSet<E> */ {

    // fields
    private final String crdtId;
    private final Set<Element<E>> elements = new HashSet<>();
    private final Set<Element<E>> tombstone = new HashSet<>();
    private final Processor<ORSetCommand<E>, ORSetCommand<E>> commands = ReplayProcessor.create();


    // constructor
    public ORSet(String crdtId) {
        this.crdtId = Objects.requireNonNull(crdtId, "Id must not be null");
    }


    // crdt
    @Override
    public String getCrdtId() {
        return crdtId;
    }

    @Override
    public void subscribe(Subscriber<? super ORSetCommand<E>> subscriber) {
        commands.subscribe(subscriber);
    }

    @Override
    public void subscribeTo(Publisher<? extends ORSetCommand<E>> publisher) {
        Flowable.fromPublisher(publisher).onTerminateDetach().subscribe(command -> {
            final Option<ORSetCommand<E>> newCommand = processCommand(command);
            newCommand.peek(commands::onNext);

        });
    }

    private Option<ORSetCommand<E>> processCommand(ORSetCommand<E> command) {
        if (command instanceof AddCommand) {
            return doAdd(((AddCommand<E>) command).getElement())? Option.of(command) : Option.none();
        } else if (command instanceof RemoveCommand) {
            return doRemove(((RemoveCommand<E>) command).getElements())? Option.of(command) : Option.none();
        }
        return Option.none();
    }


    // core functionality
    @Override
    public int size() {
        return doElements().size();
    }

    @Override
    public Iterator<E> iterator() {
        return new ORSetIterator();
    }

    @Override
    public boolean add(E value) {
        final boolean contained = doContains(value);
        prepareAdd(value);
        return! contained; } // implementation private static <U> Predicate<Element<U>> matches(U value) {return element -> Objects.equals(value, element.getValue());
    }

    private synchronized boolean doContains(E value) {
        return elements.parallelStream().anyMatch(matches(value));
    }

    private synchronized Set<E> doElements() {
        return elements.parallelStream().map(Element::getValue).collect(Collectors.toSet());
    }

    private synchronized void prepareAdd(E value) {
        final Element<E> element = new Element<>(value, UUID.randomUUID());
        commands.onNext(new AddCommand<>(getCrdtId(), element));
        doAdd(element);
    }

    private synchronized boolean doAdd(Element<E> element) {
        return(elements.add(element) | elements.removeAll(tombstone)) && (! tombstone.contains(element)); } private synchronized void prepareRemove(E value) { final Set<Element<E>> removes = elements.parallelStream().filter(matches(value)).collect(Collectors.toSet()); commands.onNext(new RemoveCommand<>(getCrdtId(), removes));doRemove(removes);
    }

    private synchronized boolean doRemove(Collection<Element<E>> removes) {
        returnelements.removeAll(removes) | tombstone.addAll(removes); } / /... }Copy the code
  • ORSet is implemented with two sets, elements for add and tombstone for remove; The processCommand method receives ORSetCommand, which has two subclasses ORSet.AddCommand and ORSet.RemoveCommand. The two commands correspond to doAdd and doRemove methods respectively. The doAdd method first creates the Element with the UUID and then adds elements to elements to remove the tombstone. The doRemove method first performs the prepareRemove to find the element collection yelps that need to be removed, then removes the yelps from Elements and adds them to the tombstone

summary

  • CRDT is short for conflict-free Replicated Data Type, also known as a passive synchronisation, i.e., conflict-free Replicated Data Type. Concrete implementation can be divided into state-based CvRDT, operation-based CmRDT, delta-based, Pure operation-based and so on
  • CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo; CvRDT, short for Convergent Replicated Data Type, also known as an Active Synchronisation, is commonly used in file systems such as NFS, AFS, Coda and KV storage such as Riak, Dynamo
  • For CRDT, in order to achieve conflict-free Replicated, the operation of data structures is Convergent, that is, it needs to satisfy Associative, Commutative and Idempotent; The basic data types of CRDT are Counters(G - Counter, the PN - Counter), Registers (LWW - Register, MV - RegisterSets(‘ g-set ‘, ‘2P-set’, ‘LWw-element Set’, ‘OR-set’)

doc

  • Talk about CRDT
  • CRDT – A powerful tool to solve the final conformance problem
  • Akka Distributed Data Deep Dive
  • Conflict-free replicated data types
  • CRDT: Conflict-free Replicated Data Types
  • Introduction to Conflict-Free Replicated Data Types
  • A Look at Conflict-Free Replicated Data Types (CRDT)
  • Conflict-free Replicated Data Types
  • A comprehensive study of Convergent and Commutative Replicated Data Types
  • wurmloch-crdt