Summary: Describes how to import line-of-business data directly from Neo4j to Nebula Graph using the official ETL tool Exchange, and the problems and optimizations encountered during the import process.

Nebula Forum: discuss.nebula-graph.com.cn/t/topic/204…

1 background

With the continuous growth of business data, the efficiency requirement for real-time update and query of graph database data is also increasing. Neo4j has obvious performance deficiencies. The open source version of Neo4j community only supports standalone deployment, so its expansion ability is relatively poor, which cannot meet the business requirements of linear expansion of read and write performance and read and write separation. In addition, the open source version of Neo4j also has limitations on the total amount of data of points and edges. However, Neo4j Enterprise version causality cluster also has performance bottleneck of real-time Cypher writing on single master node.

Nebula Graph features a shared-nothing distributed architecture with zero single-master write bottlenecks, linear scalability, and a large data set with hundreds of billions of nodes and trillions of edges.

This article describes how to import line-of-business data directly from Neo4j to Nebula Graph using the official ETL tool Exchange, as well as the problems and optimizations encountered during the import process. Most of the problems have been supported and solved by the community through forum posts. This paper will list the problems one by one.

2 Deployment Environment

System environment:

  • CPU Name: Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
  • CPU Cores: 40
  • Memory Size: 376 GB
  • Disk: HDD
  • System: CentOS Linux Release 7.4.1708 (Core)

Software environment:

  • Neo4j: version 3.4, five-node causal cluster
  • Nebula Graph:
    • Nebula – Graph V1.1.0
    • Deployment: A three-node Nebula Graph cluster is deployed on a single server.
  • Exchange: Nebula – Java V1.1.0 source code compilation JAR package
  • Warehouse environment:
    • Hadoop – 2.7.4
    • The spark – 2.3.1

Note: With Nebula multi-node port allocation on a single machine: Each storage will also use the user-configured port number + 1 for internal use. Error: Get UUID Failed when nebula imported data from Neo4j

3 Import full and incremental data

3.1 Full Import

To create the Nebula Graph’s Tag and Edge structure from the Neo4j point and Edge properties, it is important to note that the business may need to add the Neo4j point and Edge properties to only some of the points and edges. The other points and edges are NULL. Therefore, it is necessary to clarify all attribute information of points and edges with the business first to avoid omission of attributes. Nebula Graph’s Schema information is similar to MySQL in that it supports Create and Alter attributes, and all Tag and Edge metadata information is consistent.

Nebula Graph creates Tag and Edge

Create map space, 10 partitions, 3 storage copies. CREATE SPACE test(partition_num=10,replica_factor=3); Test USE test; TagA CREATE TAG tagA(VID string, field-A0 string, field-A1 bool, field-A2 double); TAG tagB(VID string, field-b0 string, field-B1 bool, field-B2 double); EdgeAB CREATE EDGE edgeAB(VID string, field-e0 string, field-e1 bool, field-e2 double);Copy the code

2. Exchange Import the configuration file

  • The Exchange configuration is not currently supportedbolt+routingIf it is a causal cluster, you can choose a slave nodeboltMode Read data directly, reducing cluster pressure.
  • The vid of our Neo4j data points and edges is of string type. The Nebula V1. x version does not support String as a VID (v2.0 support). “When the number of points reaches the billion level, there is a certain probability of conflict when the VID is generated using the hash function. So Nebula Graph provides the UUID function to avoid VID collisions with a large number of points.” Uuid () was chosen as the conversion function, but the import is less efficient than hash, and uUID () may have compatibility issues in future versions.
  • Partition: specifies the number of pages in which Exchange pulls data from Neo4j.
  • Batch: Specifies the batch size of the Nebula to be inserted in batches.
{
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["xxx.xxx.xxx.xx:3699"]
      meta:["xxx.xxx.xxx.xx:45500"]
    }
    user: user
    pswd: password
    space: test

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  
  # Processing tags
  tags: [
    # Loading tag from neo4j
    {
      name: tagA
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)"
      fields: [vid, field-a0, field-a1, field-a2]
      nebula.fields: [vid, field-a0, field-a1, field-a2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
    # Loading tag from neo4j
    {
      name: tagB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)"
      fields: [vid, field-b0, field-b1, field-b2]
      nebula.fields: [vid, field-b0, field-b1, field-b2]
      vertex: {
        field: vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]

  # Processing edges
  edges: [
   # Loading edges from neo4j
    {
      name: edgeAB
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)"
      fields: [vid, field-e0, field-e1, field-e2]
      nebula.fields: [vid, field-e0, field-e1, field-e2]
      source: {
        field: a.vid
        policy: "uuid"
      }
      target: {
        field: b.vid
        policy: "uuid"
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
  ]
}
Copy the code

3. Run the import command

Nohup spark - submit - class com. Vesoft. Nebula. View the importer. The Exchange - master "local" Exchange - 1.1.0. Jar - c test. The conf > test.log &Copy the code

4. View the amount of data imported into Nebula Graph

/bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeABCopy the code

Note: Nebula 1.x is currently available only with DB_dump, 2.0 will support nGQL statistics.

3.2 Incremental Import

Incremental data import is mainly cut through the self-increasing ID () of internal points and edges of Neo4j. When executing Neo4j Cypher statements under the exec entry of the configuration file, the id() range limit is added. However, the premise is that services must stop deleting data, because during incremental data import, If the previous data is deleted, Neo4j will reuse ID (), which will cause that the incremental data imported by reuse ID () cannot be queried, resulting in data loss. Of course, incremental imports do not have this problem if the business is capable of supporting Neo4j Nebula double-writing.

exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)"
Copy the code

For details on how to do an incremental import from Neo4j to Nebula, see the forum post

3.3 Import problems and solutions

I encountered two problems in the process of using Exchange import, which were timely supported and solved by the official @nicole. For details, please refer to the following two posts:

  • Nebula imports data from Neo4j, but returns to the car with some attributes. Is there a solution?
  • Importing Nebula from Neo4j using Exchange failed because some of the vertices in the label had null property values

Problem 1: Exchange does not support escaping special characters such as “newline carriage return”. The following string data has carriage return, and the splice INSERT statement will fail to insert because of newline.

PR:github.com/vesoft-inc/… You have joined the Exchange V1.0 branch

Problem 2: Exchange does not support data import with the NULL attribute. As mentioned in 3.1, the business may add attributes to some points and edges depending on the requirements, while others are NULL, which will cause an error when using Exchange imports.

Reference post 2 shows the changes of solution: modify com. Vesoft. Nebula. View the importer. The processor. The Processor# extraValue, increase the conversion value of the NULL type.

case NullType => { fieldTypeMap(field) match { case StringType => "" case IntegerType => 0 case LongType => 0L case Case BooleanType => false}}Copy the code

4 optimization of import efficiency

For optimization of import efficiency, please refer to the following two posts:

  • Performance issues about importing Nebula from Neo4j using Exchange
  • Exchange with spark-submit – master “local\[16\]” error

Optimization 1: Add partition and Batch values to import configurations to improve import efficiency. Optimization 2: If vid is string, hash() is used in 1.x. Version 2.0 supports string ID. If the vid type is int, it can be directly used without conversion. Optimization 3: spark-submit is recommended to change the configuration of the master to yarn-cluster. If yarn is not used, the configuration can be spark:// IP :port. We increased spark concurrency by spark-submit –master “local[16]”, and the import efficiency increased by 4 times + than using “local”. In the test environment, the IO peak value of single three-node HDD could reach 200-300 MB/s. But in the designated – master “local [16]” hadoop cache problems when concurrent imports, with HDFS configuration fs increased. The HDFS. Impl. Disable. Cache = true after restart hadoop is solved. Please refer to the second post for details.

5 concludes

During the process of importing Nebula Graph from Neo4j using Exchange, we had some problems communicating with the community and received quick response and support from @Nicole and others. This was crucial in bringing Neo4j into practice with Nebula Graph, thanks to the support of the community. Look forward to supporting Nebula Graph 2.0 in openCypher.

6 Reference Links

  1. Nebula-graph.com.cn/posts/how-t…
  2. Github.com/vesoft-inc/…
  3. Docs.nebula-graph.com.cn/manual-CN/2…
  4. Arganzheng. Life/hadoop – the file…

Recommended reading

  • Some practical details in Spark data import
  • Neo4j imports the principles and practices of Nebula Graph