Demand background

Vector retrieval

80% of the data generated every day in the world is unstructured, and 20% of structured data has the most complete and powerful database infrastructure for storage and analysis. Statistically, 80% of data is not well mined, and there is a lack of basic software to analyze unstructured data.

Unstructured data through deep learning AI techniques, such as for feature extraction of the generated vectors is essentially a mapping of unstructured data in high dimension space, he extract key information from the unstructured data encoded into the vector, and then we to the high dimensional vector processing, storage and analysis can be used to solve some of the traditional technology can’t solve the problem, Through vector retrieval technology, people broaden the boundaries of computer technology.

All scenarios requiring retrieval of unstructured data can use the vector retrieval method (and perhaps the only method), covering almost all AI scenarios.

In AI business, the vector represented by unstructured data processed by model calculation is called Embedding.

The understanding of the Embedding

Visual Embedding

KNN problem

How to retrieve K nearest neighbor data in mass data? The nearest neighbor refers to the nearest distance between two nodes defined by the user, and the calculation formula of the distance is defined by the user (similar to the comparison function in the sorting algorithm, which can be defined by the user).

Common are: Euclidean distance, cosine similarity, inner product and so on.

Distance measure of eigenvector

Similarity calculation method

Basic KNN algorithm introduction

Brute Force

Compute vector distance in pairs, search for violence, and then return topk distance results

K-means

The original paper

Based on the idea of clustering, the center point of each class cluster is calculated, and then the nearest neighbor is quickly screened by comparing query vector with the center point

The problem is that the nodes on the edge of the cluster are split, and the nearest neighbor of the edge node may cross over to another cluster domain

KD Tree

The original paper

By calculating the variance of each dimension, the partition dimension is calculated, and the midpoint of the partition dimension is taken as the partition point to obtain the left and right subtrees, and the recursive construction is achieved

During the query, recursively query the leaf node of KD Tree, then go back to the parent node, draw a circle with the target point and leaf node to judge whether other child nodes of the parent node are in this area, calculate the nearest distance if so, and then search them one by one until all the search is completed.

Annoy

The original paper

According to the clustering center point, hyperplanes are divided as non-leaf nodes, and each bucket containing K vectors is used as leaves to form a binary tree

LSH locally sensitive hash

Author’s Home page

Using k locally sensitive hash functions, compute K cells, then take union or intersection, and return topK contents

PQ product quantization

The original paper

The 128-dimensional vector is divided into 4 segments by subsection dimensionality reduction to obtain 4 32-bit subspace vectors, and then the 4 subsegments are clustered by K-means, usually resulting in 4*256 code point tables. During query, the 4 code points are calculated first, and then the distance to the center point is calculated respectively. The final asymmetric distance represents the similar distance by adding the distances and respectively.

Firstly, rough quantitative calculation can be carried out by K-means, and the region of interest can be quickly screened out by clustering 1024 classes. Then, vectors in the region of interest can be retrieved by using the distance calculation of PQ, which is called IVF-PQ.

HNSW algorithm

The original paper

Use a graph structure to store indexes

Insert and find process:

  1. To maintain a topk heap, randomly select a node at the top to get its friend points when inserting nodes, calculate the distance between these friend points and the target vector, and put them into topk’s small top heap with the distance as Socre, select the top node as the next hop node to finish the search of the current layer, and then get topk’s node list. Inserts the target point into a friend-point relationship with the nodes in the list

  2. Then, the vertex of Topk is selected as the starting point of the next layer of retrieval, and the search process above is repeated until the last layer is reached, at which point the nodes in topK heap are KNN retrieval results

  3. The search process simply removes the process of making new friends

Composite index

An overview of KNN (ANN) vector retrieval

Violent search recall rate is 100%, which can be regarded as k-means with clustering center point as vector itself, or k-means with clustering center point as 1. A large number of center points can be divided to reduce the length of clustering inversion, so as to solve the problem of HNSW algorithm occupying too much memory, and then use HNSW algorithm to organize center points. To solve the problem of low recall rate of clustering algorithm.

Simple solution

The basic steps of an online indexing service can be found according to different indexing algorithms:

  1. Build index files offline

  2. Distribute index files to the server

  3. Load index files to build in-memory data structures

  4. The server provides external index queries

The problem constraints

  1. Due to the large amount of data, the index file construction speed must be in the hour level
  2. Retrieval accuracy and performance on large-scale data
  3. Data is generated in real time, and indexes need to be updated in real time to better reflect incremental information and improve business metrics
  4. When services are complicated, you can specify label attributes to narrow the search range and speed up the query
  5. No one algorithm model is suitable for all scenarios, so different index building algorithms are supported
  6. Complex indexing algorithms have a lot of parameters. What if you hyper-parameterize these parameters?
  1. Ten billion scale vector (412810 Billion = 5T), an offline build in 5 hours
  2. Support hundreds of index models, multiple versions of the simultaneous offline construction and online services
  3. Support topk k=5000 query p99 10ms (0 <= k <= 5000)
  4. Dynamically modify real-time indexes in milliseconds

Architecture to deduce

What are the core domain objects? – > index

Basic architecture

Hourly build index files

Index building is a computationally intensive task that requires a parallel data processing architecture

MapReduce paper introduction

GFS- Paper introduction

Millisecond KNN queries for large datasets, with maximum accuracy guaranteed

Data intensive system design

In KNN algorithm, graph based retrieval has the highest accuracy, and HNSW has the best performance. However, its disadvantage lies in the large memory consumption, and it is difficult to store the vector index with a scale of ten billion in a single machine. Therefore, the index can only be sharded to scale horizontally, but sharding leads to the fragmentation of graph retrieval, reducing some edge construction and resulting in a small loss of retrieval performance and accuracy, but the trade-off for horizontal scalability is worth it.

Sharding involves several issues:

  1. What is the basis for sharding?

hash range clustering
N fragment index files are randomly allocated, and data is evenly distributed Sort sharding by vector (unrealistic) order, write requests have hot spots First, the vector clustering, according to the clustering center point classification process is slow, data distribution is unbalanced
  1. How do YOU route and aggregate the results?

The fan out random clustering
The highest accuracy, high bandwidth consumption Low accuracy, high randomness (recommended), low bandwidth consumption High accuracy, low I/O consumption, and high CPU consumption for routing logic calculation
Sort by the distance returned as the score, select TOPK
  1. Who enforces routing policies? [Client/proxy Service]

The client The proxy service
The cost is low, but flexibility is reduced, and the caller’s server resources are consumed High cost, high flexibility, no invasion to switch users
  1. How do I deal with hot Keys?

The redundant copy Local caching
Consistency due to replication needs to be considered LRU, LFU and other elimination strategies, hit ratio, etc

Fragmented copy replication causes large-scale DATA migration I/OS, and network bandwidth is full, affecting construction and query
A master-slave replication Chain replication
Low replication delay, poor throughput, strong robustness High latency, strong throughput, poor robustness
pipeline To optimize the
The files in the process of data transmission are fragmented, and the data is transferred to the next node before a file is received, making full use of network bandwidth resources

How to incrementally build and update indexes in real time

The characteristic producer’s direct RPC is inserted into the SERVING HNSW index Build index shards in small batches through the MQ formation time window
HNSW has poor insert performance, which affects query performance. High QPS scenarios cannot be directly inserted Online indexes are read-only so performance is not affected, but multiple small shards are formed and the result 1 needs to be aggregated once in each SERVING as well.How often or how often? Make a choice between time and space
  1. How many shards should be saved? How to choose?

    1. Maintain 128 shards
    2. Get the latest, and random | take four, a total of five other divided divided

Better Architecture

Realize the principle of

Meta Server design

  1. Schedule the index offline build process

  2. Maintains routing information about fragments and replicas

  3. Periodic health check, scheduling failed tasks or Serving to ensure cluster availability

The model’s state machine

  1. The Meta Server maintains the entire life cycle of the model build, with the newly created INDEX build task with the index status init.

  2. Create multiple build tasks for the current index and queue them to multiple Build servers and change the state to building. Builder Server pulls files from FS to local according to the file path configured in the received task. The hnSW_lite algorithm library is triggered to build the model and save it to the local, and synchronize it to FS. Builder Server tells Meta Server that the task is completed and updates the model status to Trans.

  3. Meta Server creates multiple Trans tasks and dispatts them to the Task queue to inform serving Group that the latest Master shard has been constructed. The Serving Group master node descends the model from FS pull, initiates the chain copy, synchronizes the shard to the copy, and notifies the Meta that the synchronization is complete.

  4. When all trans tasks are completed, change the state to Loading and put loading tasks in a queue.

  5. The tail node of the linked list confirms that the model status changes to Online after the model is loaded and updates the version information of the model in ETCD.

  6. When the meta Server service changes the model status to Offline, fragmented route requests to access the model will be rejected.

  7. On the platform side, the offline or online index can be updated to update, and the index can be built with the new vector

  8. Builder Server carries out a new round of construction by changing the version number of PATH to load the corresponding version vector file of FS

DAG task description

{

    "build":{

     "work_dir":"hdfs://biz/table/index/"

    },

    "trans":{       

       "dep":["build"]

    },

    "load":{       

       "dep":["trans"]

    },

    "shard_num":3,

    "verison":1

}
Copy the code

Routing information

Use map to speed up queries, and maintain some health check information in nodes

Serving/Query design

The service start

  1. The server is registered with etCD at startup for easy discovery by Meta Server

  2. Meta Server health checks the Serving node. If the chain is not available, a round of group replication chain elections is triggered to the node with the largest nodeID, which is implemented by a configuration change in etCD.

To avoid meta Server single point problems, the replication chain can elect itself based on ETCD.

  1. Serving Gets the group information from etcd, organizing into replication chains (selecting the nearest node by IP distance to organize into chains)

Load model

  1. Master gets meta task notification and pulls files from FS to separate folders by version

  2. The file contents are streamed to the next node of the associated group during loading

  3. After the loading is complete, the mv goes to the working directory, overwrites the files of the old version, and triggers the load logic

  4. Load file contents into memory, build a new version model, and seamlessly switch the underlying model online

  5. Clear the list of live index models and prepare to build a live index by receiving new Kafka consumption content

Online inquiry

  1. Query requests primary indexes, concurrent requests for the latest indexes, and random requests for other real-time indexes

  2. The retrieval thread needs to be notified to terminate immediately when the timeout expires to save thread resources

  3. Do topK aggregation on the machine, and then do another TopK on the proxy service

The Proxy Server design

distribution

  1. Proxy periodically pulls the latest routing information from the meta

  2. Verify the access index model by running the biz-table-index-version command

  3. The route_model field indicates the routing policy used. 1 indicates random, 2 indicates clustering, and 3 indicates fan out

  4. At random, the group list is returned from route, and a random shard from 1-5 lists is selected for request by quick shuffle

  5. During clustering, a specific group list is selected according to the label field, and each group requests a random SHard.

  6. Fan-out requests random shards in all groups under route.

The aggregation

  1. It is generally recommended to recall the 5K item, and you can configure the request topk interface to be half the size of Query Count when calling Serving

  2. After receiving the itME of 2.5K *shardNum, take the top 5K content with the highest score and return

To improve the direction

Performance optimization

Off-line calculation

Can you build ten billion vectors in five hours?

  1. A billion FLOAT32 store 128 dimensional vectors requires 412810 Billion = 5 TB file size storage
  2. There are at least 100+ indexes online, with a total processing data set of 500T

Build state machines by streaming

  1. Task Manager does not need to wait for a DAG subphase Task to complete before executing a Task in the next phase
  2. You only need to start the next associated task after the completion of a subtask, and the granularity of the execution descends from state to subtask

How to speed up floating-point calculations that are too slow?

  1. Use FPU&GPU to speed up computation
  2. SIMD&AVX instruction set is used for parallel computation, and 128 bit data is loaded at one time for calculation
  3. Use fixed-point numbers to store data, reducing file size and index memory size

Online distribution

The retrieval process creates a large number of small objects, causing collection stress and memory fragmentation

Implement SLAB allocator in user mode to pool resources for small objects

Serving Service exists, causing service jitter upon restart

Use shared memory to store index data, free of burden in case of a restart due to a failure or upgrade

Stability optimization

How to quickly do version rollback to improve usability?

  1. Load the file to the corresponding version directory
  2. Copy to a TMP directory
  3. TMP file mv to the working directory to overwrite the current version of the index file
  4. In the case of quick rollback or bottom loading, go to the specified version directory to load the corresponding version of the index file

The delay depends on the slowest operator. How to handle the timeout case?

  1. Give tasks a cutoff time when retrieving and try to return the current list after the retrieval time
  2. Set a distance threshold below which to return to empty, otherwise truncate the return to bottom result
  3. Returns locally available results to handle failures and cascaded fast reclaiming thread resources

Meta Server has a single point problem. How to ensure the reliability of metadata?

  1. Metadata is all stored in ETCD and MATA Server only performs scheduling calculations (storage and computation separated)
  2. Stateless MATA Server can scale horizontally, and caching ETCD storage tier results improves performance

Cost optimization

To be continued….

Infrastructure upgrade

Cloud.tencent.com/developer/a…

Stream batch integration, log is data

Duality of tables and logs, similar to log consistency raft maintains state machines. A table is a moment in time state, and a log is a stream state

The offline construction process is abandoned. All data is managed by logs. Logs are aggregated into snapshots and indexes are created on snapshots to provide query

Mart architecture

Milvus paper

  1. At initialization, each node sends a registration event to the Event Bus, and the Meta Server builds the cluster state and writes the ETCD
  2. Meta Server synchronizes the cluster status to all nodes through the Event Bus and provides services externally after reading all phases
  3. The write SDK calls RPC to write data to the appender service, which performs authentication/data processing/vector normalization
  4. After the bus vector writes the event, the Builder subscribes to the event and aggregates the written data in memory (time & space)
  5. Call Lib on the aggregate data to build an index block and write it synchronously to the object storage service and send an index block creation event to the Event Bus
  6. The Query node subscribes to the index block creation event that reads and writes the index block file from oss and loads it into memory to provide services externally.
  7. The Proxy node subscribes to meta cluster status events to learn about bizid-table-index-version distribution in the group
  8. Query requests access proxy nodes through SDK load balancers (such as consistent hash)
  9. The Proxy splits sub Query tasks to query nodes according to routing conditions and reduces the results returned by query nodes
  10. The query node queries the hit index block, reduces the result, and returns the result
  11. The event Bus snapshot event can be used to quickly recover a node from a crash
  12. The balance of real-time and throughput indicators can be adjusted by controlling the time window of the index block
  13. You need to provide an offline import tool to support batch data import to the vector retrieval database during cluster initialization
  14. Each event has its globally unique ID, which is assigned by the Meta Server and can be allocated in batches to improve performance.

reference

  1. Distributed vector retrieval engine

  2. The original text of the k-means thesis

  3. KD Tree paper original text

  4. The original paper of AnnoyANN

  5. LSH original author home page

  6. Original PQ paper

  7. Enabling Ali multi-business scenarios, Dharma Institute self-developed vector search engine Proxima open – Dharma Institute

  8. The basic algorithm of vector retrieval is introduced

  9. KD tree and KD tree nearest neighbor algorithm – digging gold

  10. A review of KNN algorithms

  11. Fast Approximate Nearest Neighbor Search With The Navigating Spreading-out Graph

  12. Application of locally sensitive hashing in recommendation system

  13. An article to understand the HNSW algorithm theory of the ins and outs _U011233351 blog -CSDN blog

  14. Image retrieval: Reclassified ANN Search

  15. Design differences between CPU and GPU – Magnum Programm Life

  16. SIMD+SSE+AVX

  17. Use SIMD to accelerate Golang

  18. AVX512 golang/go Wiki

  19. An overview of KNN (ANN) vector retrieval