RDMA Improves Alluxio (Tachyon) Remote Read Bandwidth and CPU Utilization by up to 50%

Version 21

    This post demonstrates acceleration benchmark results collected while comparing TCP versus RDMA connectivity using Tachyon. This post assumes familiarity with RDMA and Alluxio (Previously named Tachyon).

     

    Note: Lately, Alluxio acquired Tachyon, See Alluxio - Memory Speed Virtual Distributed Storage.

     

    Tachyon is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks, such as MapReduce. It achieves high performance by leveraging lineage information and using memory aggressively. Tachyon caches working set files in memory and enables different jobs/queries and frameworks to access cached files at memory speed.  Tachyon over RDMA solution is currently on the final stages of review of the Tachyon community, and planned to be accepted upstream shortly.  For more information about Tachyon click here.

     

    The TCP code is part of the Tachyon package and include two methods - Java NIO and Netty. While the RDMA code was based on the JXIO library (Java API over AccelIO). JXIO is Java API over AccelIO (C library). AccelIO (http://www.accelio.org/) is a high-performance asynchronous reliable messaging and RPC library optimized for hardware acceleration. RDMA and TCP/IP transport are implemented, and other transports, such as shared-memory can take advantage of efficient and convenient API.

     

    The RDMA code for Tachyon  is composed of 2 Tachyon patches:

    1. The first patch which is already merged and pushed upstream expands the remote read infrastructure to enable use of pluggable client and server with different transport protocols (click here)

    2. The second patch uses the new infrastructure and adds RDMA client and server (click here).

     

    Tachyon + RDMA Block Diagram

     

    tachyon.jpg

     

    References

     

    The Benchmark

    The benchmark test used measures reading remote files with different sizes by several clients while measuring the average time for reading one file and the aggregated client and server cpu usage.

    The benchmark was tested over Infiniband 40Gbs (QDR) network with RDMA enabled or just via TCP.

     

    The Benchmark test can be found at the end of this post and here.

     

    The block size if fixed to 1GB for all file sizes.

    For each run, the number of clients, number of files and the file sizes were changed.

    • Number of clients: 1, 4
    • Number of files: 1, 20
    • File size: 50MB, 100MB, 500MB, 1GB, 2GB, [5GB, 10GB, 40GB]

     

    Note: 5,10,50GB file size is relevant only for the first test (see graphs below).

     

    Each cycle is run with TCP (Nitty or NIO) or with RDMA to enable the acceleration for Tachyon.

    For each configuration, two samples were run and an average of the results was measured.

     

    Setup

    • 2 hosts
    • OS: Red Hat 6.4
    • RAM: 48GB
    • CPU: Dual socket/ 8 cores/ x86_64
    • NIC: ConnectX-3
    • Netwrok: Infiniband QDR 40Gbs
    • InfiniBand Switch: SX6036 (SW version 3.3.3000)
    • MLNX_OFED Linux Driver

     

    Configuration

     

    Specific configuration for Tachyon:

    • tachyon.user.remote.read.buffer.size.byte = 1073741824
    • tachyon.worker.data.server.class = tachyon.worker.rdma.RDMADataServer
    • tachyon.user.remote.block.reader.class = tachyon.client.rdma.RDMARemoteBlockReader

     

    Command line:

    • write files: java -cp tachyon-<version>-jar-with-dependencies.jar tachyon.examples.ReadTest <master_ip:port> write <comma seperated list of files> <number of bytes per file>
    • read files from a different machine: java -cp core/target/tachyon-<version>-jar-with-dependencies.jar tachyon.examples.ReadTest <master_ip:port> read <comma seperated list of files> <file bytes>

     

     

    Results

     

    The results shown in this graph are Time[msec] or CPU Usage as a function of the File Size.

    The CPU Usage value is a calculated value of CPU utilization (in percentage) x time (in seconds).

     

     

    Click here for the  Detailed results.

     

     

       

     

     

    Conclusions

     

    • Up to 50% better file remote read duration for single client read
    • Up to 50% better CPU utilization for multiple simultaneous client reads
    • Performance improvement increase for larger files (RDMA connection establishment time is longer than TCP which hurts smaller files improvement)
    • Performance improvement increase for multiple files remote reads (reuse of RDMA already established connection via 'connection caching')

     

    Benchmark Test Example

     

    To run this test, perform the following steps:

    1. Download tachyon

    2. Add the following benchmark test below (ReadTest.java) under /tachyon/core/src/main/java/tachyon/examples/ReadTest.java

    3. Complie tachyon package

    4. Start tachyon

    5. Run the benchmark test

     

    package tachyon.examples;

     

    import java.io.IOException;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import tachyon.Constants;

    import tachyon.Version;

    import tachyon.client.InStream;

    import tachyon.client.OutStream;

    import tachyon.client.ReadType;

    import tachyon.client.TachyonFS;

    import tachyon.client.TachyonFile;

    import tachyon.client.WriteType;

     

    public class ReadTest {

      private static final Logger LOG = LoggerFactory.getLogger(Constants.LOGGER_TYPE);

      private static TachyonFS sTachyonClient;

      private static String sOperation = null;

      private static long sBytes = 0;

      private static boolean sCheckData = false;

      private static String[] sFiles = null;

      public static void main(String[] args) throws IOException {

        if (args.length < 4) {

          System.out

              .println("java -cp target/tachyon-"

                  + Version.VERSION

                  + "-jar-with-dependencies.jar "

                  + "tachyon.examples.ReadTest <TachyonMasterAddress>"

                  + " <Operation> <FileList_CommaSeparated> <FileBytes> <Optional_CheckData>");

          System.exit(-1);

        }

        sTachyonClient = TachyonFS.get(args[0]);

        sOperation = args[1];

        sFiles = args[2].split(",");

        sBytes = Long.parseLong(args[3]);

        if (args.length == 5) {

          sCheckData = Boolean.parseBoolean(args[4]);

        }

        LOG.info("Going to " + sOperation + " " + sFiles.length + " files with " + sBytes + " bytes, "

            + args[2]);

        if (sOperation.equals("read")) {

          readFiles();

        } else {

          createFiles();

          writeFiles();

        }

        sTachyonClient.close();

        System.exit(0);

      }

      public static void createFiles() throws IOException {

        for (int i = 0; i < sFiles.length; i ++) {

          int fileId = sTachyonClient.createFile(sFiles[i]);

          LOG.info("File " + sFiles[i] + " created with id " + fileId);

        }

      }

      public static void writeFiles() throws IOException {

        for (int i = 0; i < sFiles.length; i ++) {

          TachyonFile file = sTachyonClient.getFile(sFiles[i]);

          OutStream os = file.getOutStream(WriteType.MUST_CACHE);

          //

          for (long k = 0; k < sBytes; k ++) {

            os.write((byte) k);

          }

          os.close();

          LOG.info("Write to file " + sFiles[i] + " " + sBytes + " bytes");

        }

      }

      public static void readFiles() throws IOException {

        try {

          for (int i = 0; i < sFiles.length; i ++) {

            long time = 0;

            int read = 0;

            TachyonFile file = sTachyonClient.getFile(sFiles[i]);

            LOG.info("Going to read file " + sFiles[i] + " size " + file.length());

            time = System.nanoTime();

            InStream is = file.getInStream(ReadType.NO_CACHE);

            if (sCheckData) {

              LOG.info("validating data");

              long k = 0;

              while (read != -1) {

                read = is.read();

                if (read != -1 && (byte) read != (byte) k) {

                  LOG.error("ERROR IN TEST, got " + (byte) read + " expected " + (byte) k);

                  return;

                }

                k ++;

              }

            } else {

              byte[] buf = new byte[4 * 1024];

              while (read != -1) {

                read = is.read(buf);

              }

            }

            time = System.nanoTime() - time;

            System.out.println("Finished reading file " + sFiles[i] + " time " + time + " \n");

          }

        } catch (Exception e) {

          LOG.error("got Exception in test " + e.toString());

          StackTraceElement[] stackTraceElements = e.getStackTrace();

          for (StackTraceElement stackTrace : stackTraceElements) {

            LOG.error("\t" + stackTrace.toString());

          }

          System.exit(1);

        }

      }

    }