Learn about Spark projects on NebulaGraph

This article was first published in Nebula Graph Community Public Number

Recently, I tried to build a simple one-click Nebra Graph Spark Related projects, today they are organized and shared. What's more, I've traveled out and contributed to the documentation the way Nebula Spark Connector works under PySpark.

Three Spark Subprojects of NebulaGraph

I've drawn one around all the data import methods for NebulaGraph sketch , which already contains a brief introduction to Spark Connector, Nebula Exchange. In this article, I'll go a little deeper into them with another Nebra Algorithm.

Note: This document It is also clear that we have listed the choices of different import tools.


  • Nebula Spark Connector is a Spark Lib that enables Spark applications to read and write graph data from Nebula Graph as a dataframe.
  • Nebula Exchange is built on Nebula Spark Connector and is an application that Spark Lib can also execute directly by Spark submitting JAR packages. It is designed to exchange different data sources with Nebula Graph (for open source versions, it is one-way: write, for enterprise versions, it is two-way). Nebula Exchange supports many different types of data sources such as: MySQL,Neo4j,PostgreSQL,ClickHouse,Hive Wait. In addition to writing directly to NebulaGraph, it also has the option of generating SST files and injecting them into NebulaGraph to help sort the underlying layers using a computing power beyond the NebulaGraph cluster.
  • Nebula Algorithm, built on Nebula Spark Connector and GraphX, is also an application on Spark Lib and Spark to run common graph algorithms (pagerank, LPA, etc.) on Nebula Graph diagrams.

Nebula Spark Connector

NebulaGraph Spark Reader

In order to read data from NebulaGraph, such as vertex, Nebula Spark Connector will scan all Nebula StorageD s with a given TAG, such as scanning players for TAG: withLabel("player"), and we can also specify the properties of vertex: withReturnCols(List("name", "age").

After specifying all TAG-related configuration for reading, call spark. Read. Nebula. Load VerticesToDF returns graph data converted to Dataframe after scanning Nebula Graph, like this:

  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .withReturnCols(List("name", "age"))
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    println("vertex count: " + vertex.count())

Written examples are not listed here, but there are more detailed examples in the links to the code examples given earlier. It is worth mentioning here that Spark Connector reads data to satisfy a large number of data scenarios for graph analysis, graph calculation, and very different from most other clients by bypassing GraphD and scanning MetaD and StorageD for data. However, in the case of writing, the nGQL DML statement is initiated by GraphD.

Let's start with a hands-on exercise.

Beginner Nebula Spark Connector

Prerequisites: Assume the following program is running on a Linux machine with an Internet connection, preferably with Docker and Docker-Compose pre-installed.

Pull up the environment

First, let's use Nebula-Up Deploy container-based Nebula Graph Core v3, Nebula Studio, Nebula Console and Park, Hadoop environments and try to install Docker and Docker-Compose for us if they are not already installed.

# Install Core with Spark Connector, Nebula Algorithm, Nebula Exchange
curl -fsSL nebula-up.siwei.io/all-in-one.sh | bash -s -- v3 spark

You know what? Nebula-UP You can load more at one click if your environment is a bit larger (like 8 GB RAM) curl-fsSL nebula-up. Siwei. Io/all-in-one. SH | bash can hold more, but please note Nebula-UP Not for the production environment.

After executing the above side script, let's use Nebula-Console (Nebula Graph's command line client) to connect to it.

# Connect to nebula with console
# Execute any queryies like
~/.nebula-up/console.sh -e "SHOW HOSTS"

Load a copy of the data in and execute a graph query:

# Load the sample dataset
# Wait about a minute

# Make a Graph Query the sample dataset
~/.nebula-up/console.sh -e 'USE basketballplayer; FIND ALL PATH FROM "player100" TO "team204" OVER * WHERE follow.degree is EMPTY or follow.degree >=0 YIELD path AS p;'

Enter the Spark environment

By doing this, we can enter the Spark environment:

docker exec -it spark_master_1 bash

If we want to compile, we can install mvn inside:

docker exec -it spark_master_1 bash
# in the container shell

export MAVEN_VERSION=3.5.4
export MAVEN_HOME=/usr/lib/mvn

wget http://archive.apache.org/dist/maven/maven-3/$MAVEN_VERSION/binaries/apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  tar -zxvf apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  rm apache-maven-$MAVEN_VERSION-bin.tar.gz && \
  mv apache-maven-$MAVEN_VERSION /usr/lib/mvn

Example of running Spark Connector

Option 1 (recommended): via PySpark
  • Enter PySpark Shell
  • Call Nebula Spark Reader
# call Nebula Spark Connector Reader
df = spark.read.format(
    "type", "vertex").option(
    "spaceName", "basketballplayer").option(
    "label", "player").option(
    "returnCols", "name,age").option(
    "metaAddress", "metad0:9559").option(
    "partitionNumber", 1).load()

# show the dataframe with limit of 2
  • Return result example
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.5

Using Python version 2.7.16 (default, Jan 14 2020 07:22:06)
SparkSession available as 'spark'.
>>> df = spark.read.format(
...   "com.vesoft.nebula.connector.NebulaDataSource").option(
...     "type", "vertex").option(
...     "spaceName", "basketballplayer").option(
...     "label", "player").option(
...     "returnCols", "name,age").option(
...     "metaAddress", "metad0:9559").option(
...     "partitionNumber", 1).load()
>>> df.show(n=2)
|_vertexId|          name|age|
|player105|   Danny Green| 31|
|player109|Tiago Splitter| 34|
only showing top 2 rows
Option 2: Compile, submit sample JAR packages
  • First clone the Spark Connector and the code repository for its sample code, then compile:

Note that we used the master branch because the master branch is now compatible with 3.x, make sure that the spark connector matches the database kernel version, which corresponds to the README of the relational reference code repository. MD.

cd ~/.nebula-up/nebula-up/spark
git clone https://github.com/vesoft-inc/nebula-spark-connector.git

docker exec -it spark_master_1 bash
cd /root/nebula-spark-connector
  • Replace the code for the sample project
echo > example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala

vi example/src/main/scala/com/vesoft/nebula/examples/connector/NebulaSparkReaderExample.scala
  • Paste in the following code, where we read the graph loaded in front: vertices and edges on the basketballplayer: calls readVertex and readEdges, respectively.
package com.vesoft.nebula.examples.connector

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.connector.NebulaDataFrameReader
import com.vesoft.nebula.connector.{NebulaConnectionConfig, ReadNebulaConfig}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory

object NebulaSparkReaderExample {

  private val LOG = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val spark = SparkSession



  def readVertex(spark: SparkSession): Unit = {
    LOG.info("start to read nebula vertices")
    val config =
    val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
      .withReturnCols(List("name", "age"))
    val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
    println("vertex count: " + vertex.count())

  def readEdges(spark: SparkSession): Unit = {
    LOG.info("start to read nebula edges")

    val config =
    val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
    val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
    println("edge count: " + edge.count())

  • Then pack it into a JAR package
/usr/lib/mvn/bin/mvn install -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true
  • Finally, submit it it to Spark for execution:
cd example

/spark/bin/spark-submit --master "local" \
    --class com.vesoft.nebula.examples.connector.NebulaSparkReaderExample \
    --driver-memory 4g target/example-3.0-SNAPSHOT.jar

# Exit spark Container
  • Once we succeed, we will get the results back:
22/04/19 07:29:34 INFO DAGScheduler: Job 1 finished: show at NebulaSparkReaderExample.scala:57, took 0.199310 s
|_vertexId|              name|age|
|player105|       Danny Green| 31|
|player109|    Tiago Splitter| 34|
|player111|        David West| 38|
|player118| Russell Westbrook| 30|
|player143|Kristaps Porzingis| 23|
|player114|     Tracy McGrady| 39|
|player150|       Luka Doncic| 20|
|player103|          Rudy Gay| 32|
|player113|   Dejounte Murray| 29|
|player121|        Chris Paul| 33|
|player128|   Carmelo Anthony| 34|
|player130|       Joel Embiid| 25|
|player136|        Steve Nash| 45|
|player108|        Boris Diaw| 36|
|player122|    DeAndre Jordan| 30|
|player123|       Ricky Rubio| 28|
|player139|        Marc Gasol| 34|
|player142|     Klay Thompson| 29|
|player145|      JaVale McGee| 31|
|player102| LaMarcus Aldridge| 33|
only showing top 20 rows

22/04/19 07:29:36 INFO DAGScheduler: Job 4 finished: show at NebulaSparkReaderExample.scala:82, took 0.135543 s
|   _srcId|   _dstId|_rank|degree|
|player105|player100|    0|    70|
|player105|player104|    0|    83|
|player105|player116|    0|    80|
|player109|player100|    0|    80|
|player109|player125|    0|    90|
|player118|player120|    0|    90|
|player118|player131|    0|    90|
|player143|player150|    0|    90|
|player114|player103|    0|    90|
|player114|player115|    0|    90|
|player114|player140|    0|    90|
|player150|player120|    0|    80|
|player150|player137|    0|    90|
|player150|player143|    0|    90|
|player103|player102|    0|    70|
|player113|player100|    0|    99|
|player113|player101|    0|    99|
|player113|player104|    0|    99|
|player113|player105|    0|    99|
|player113|player106|    0|    99|
only showing top 20 rows

In fact, there are many more examples under this code repository, especially GraphX For example, you can try to explore this section yourself.

Note that in GraphX, vertex IDs are assumed to be numeric, so for string type vertex IDs, a real-time conversion is required, see Examples in Nebula Algorithom To learn how to bypass this problem.

Nebula Exchange

Nebula Exchange is a Spark Lib and a Spark application that can submit execution directly. It is used to read data from multiple data sources, write data to Nebula Graph, or output Nebula Graph SST file.

Using Nebula Exchange via spark-submit is straightforward:

  • Create a configuration file first to let Exchange know how to get and write data
  • Then call the Exchange package with the specified profile

Now, let's do a real-world test with the same environment we created in the previous chapter.

One Click to try Exchange

Let's run and see first

Please refer to the front Pull up the environment In this chapter, first install the environment with one click.

One-click execution:


Congratulations, you have successfully performed an Exchange data import task for the first time!

Take a look at some more details

In this example, we are actually using Exchange to write data to the NebulaGraph cluster from a CSV file, one of the supported data sources. The first column in this CSV file is the vertex ID, and the second and third columns are attributes of Name and Age:

player800,"Foo Bar",23
player801,"Another Name",21
  • Let's take a look at the Spark environment
docker exec -it spark_master_1 bash
cd /root
  • You can see the profile exchange that we specified when we submitted the Exchange task. Conf is a file in HOCON format:

    • In. Information about the nebula Graph cluster is described in nebula
    • In. The tags describe how to map the required fields to our data source (CSV file here) and other information about Vertecies.
  # Spark relation config
  spark: {
    app: {
      name: Nebula Exchange


    driver: {
      cores: 1
      maxResultSize: 1G

    executor: {
        memory: 1G

      max: 16

  # Nebula Graph relation config
  nebula: {
      meta:["metad0:9559", "metad1:9559", "metad2:9559"]
    user: root
    pswd: nebula
    space: basketballplayer

    # parameters for SST import, not required
        hdfs.namenode: "hdfs://localhost:9000"

    # nebula client connection parameters
    connection {
      # socket connect & execute timeout, unit: millisecond
      timeout: 30000

    error: {
      # max number of failures, if the number of failures is bigger than max, then exit the application.
      max: 32
      # failed import job will be recorded in output path
      output: /tmp/errors

    # use google's RateLimiter to limit the requests send to NebulaGraph
    rate: {
      # the stable throughput of RateLimiter
      limit: 1024
      # Acquires a permit from RateLimiter, unit: MILLISECONDS
      # if it can't be obtained within the specified timeout, then give up the request.
      timeout: 1000

  # Processing tags
  # There are tag config examples for different dataSources.
  tags: [

    # HDFS csv
    # Import mode is client, just change type.sink to sst if you want to use client import mode.
      name: player
      type: {
        source: csv
        sink: client
      path: "file:///root/player.csv"
      # if your csv file has no header, then use _c0,_c1,_c2,.. to indicate fields
      fields: [_c1, _c2]
      nebula.fields: [name, age]
      vertex: {
      separator: ","
      header: false
      batch: 256
      partition: 32

  • We should see that the CSV data source and this profile are in the same directory:
bash-5.0# ls -l
total 24
drwxrwxr-x    2 1000     1000          4096 Jun  1 04:26 download
-rw-rw-r--    1 1000     1000          1908 Jun  1 04:23 exchange.conf
-rw-rw-r--    1 1000     1000          2593 Jun  1 04:23 hadoop.env
drwxrwxr-x    7 1000     1000          4096 Jun  6 03:27 nebula-spark-connector
-rw-rw-r--    1 1000     1000            51 Jun  1 04:23 player.csv
  • Then, in fact, we can manually submit this Exchange task again
/spark/bin/spark-submit --master local \
    --class com.vesoft.nebula.exchange.Exchange download/nebula-exchange.jar \
    -c exchange.conf
  • Partial Return Results
22/06/06 03:56:26 INFO Exchange$: Processing Tag player
22/06/06 03:56:26 INFO Exchange$: field keys: _c1, _c2
22/06/06 03:56:26 INFO Exchange$: nebula keys: name, age
22/06/06 03:56:26 INFO Exchange$: Loading CSV files from file:///root/player.csv
22/06/06 03:56:41 INFO Exchange$: import for tag player cost time: 3.35 s
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchSuccess.player: 2
22/06/06 03:56:41 INFO Exchange$: Client-Import: batchFailure.player: 0

For more data sources, see the documentation and configuration examples.

For practice with Exchange output SST files, you can refer to the documentation and my old articles Nebula Exchange SST 2.x Practice Guide.

Nebula Algorithm

Submit tasks through spark-submit

I am here This code repository Here's an example. Today we can experience it more easily with Nebula-UP.

Reference Front Pull up the environment In this chapter, first install the environment with one click.

After deploying the required dependencies through Nebula-UP's Spark mode as above

  • Executes a PageRank algorithm on the LiveJournal dataset and outputs the results to a CSV file
  • Check the output:
docker exec -it spark_master_1 bash

head /output/part*000.csv

Profile Interpretation

Complete file in Here Here, we will introduce the main fields:

  • . Data specifies that the source is Nebula, meaning graph data is retrieved from the cluster, and the output sink is csv, meaning it is written to a local file.
  data: {
    # data source. optional of nebula,csv,json
    source: nebula
    # data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
    sink: csv
    # if your algorithm needs weight
    hasWeight: false
  • . nebula.read specifies the relationship for reading the NebulaGraph cluster, where the edge data for all edge types: follows is read as a whole graph
  nebula: {
    # algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
    read: {
        # Nebula metad server address, multiple addresses are split by English comma
        metaAddress: "metad0:9559"
        # Nebula space
        space: livejournal
        # Nebula edge types, multiple labels means that data from multiple edges will union together
        labels: ["follow"]
        # Nebula edge property name for each edge type, this property will be as weight col for algorithm.
        # Make sure the weightCols are corresponding to labels.
        weightCols: []
  • . The algorithm configures the algorithm we're calling, and the configuration of the algorithm
  algorithm: {
    executeAlgo: pagerank

    # PageRank parameter
    pagerank: {
        maxIter: 10
        resetProb: 0.15  # default 0.15

Call Nebula Algoritm as a library in Spark

Note, on the other hand, that we can call Nebula Algoritm as a library with the benefit of:

  • More control/customization of the output format of the algorithm
  • You can convert non-digital ID s, see Here

I won't give you an example here. If you're interested in asking Nebula-UP for help, I'll add another one.

AC Diagram Database Technology? Join Nebula Exchange Group first Fill in your Nebula business card , Nebula's assistant will pull you into the group~~

Tags: Database

Posted by bleh on Mon, 18 Jul 2022 17:34:50 +0930