SLEEPy

From CDOT Wiki
Revision as of 17:38, 11 April 2016 by Jsiu3 (talk | contribs) (Useful Links)
Jump to: navigation, search


GPU621/DPS921 | Participants | Groups and Projects | Resources | Glossary

Intel Data Analytics Acceleration Library (DAAL)

Team Member

  1. Luong Chuong
  2. Jacky Siu
  3. Woodson Delhia

Introduction

What is Big Data

Big Data is data that is so big that traditional methods of data processing fail to keep up. A lot of the time this type of data is related to human behavior measurable by computers. This maybe trends, web analytics etc. With so many people using technology, data gathered this way is becoming massive. This is one of the reasons for the rise of Big Data.

What is DAAL

DAAL is a C++ & Java / Scala library for data analytics. It's similar to MKL(Math Kernel Library) with some differences:

  • MKL focuses on computation. DAAL focuses on the entire data flow (aquisition, transformation, processing).
  • Optimized for all kinds of Intel based devices (from data center to home computers)


DAAL supports 3 processing modes

  • Offline Processing (Batch) - Data can fit in memory, data can be processed all at once.
  • Online Processing (Streaming) - Data is too big for memory, DAAL processes the data in chunks and combine the partial results for the final result.
  • Distributed processing - Distributes data processing. DAAL has not bound the communication method and leaves it to the developer (Hadoop, Spark, MPI etc).


DAAL Data Flow.

How DAAL Works

DaalModel.png



DAALDataSet.PNG



DAALDataflow.PNG

Code Examples

Batch Sorting

CSV Data:

-55.558252,63.051427,-27.793776, -75.622534,61.212279,-16.283311, -86.747071,-28.132241,-17.824316, -34.172101,-51.404172,14.670925, -61.506308,48.248030,-99.235341, 9.746765,-89.879258,55.561778, 48.896723,-32.648097,48.313603, -15.346015,9.769776,-33.483281, 56.726081,-87.272631,8.724224, -1.926802,54.960580,-78.723429, 45.237223,-79.764218,-47.271926, 84.138339,11.547818,-92.962952, 46.711824,-42.623510,-34.664673, 55.813112,19.803475,4.807766, -55.474098,-72.163755,89.425736, -7.566596,-77.829218,58.630172, -76.081937,-12.089445,-44.065054, -25.888944,46.425499,-37.515164, -30.201387,-16.237217,-50.716591, -88.085869,60.136249,54.812866

Code:

/* file: sorting_batch.cpp
* Copyright 2014-2016 Intel Corporation All Rights Reserved.*/

#include "daal.h"
#include "service.h"

using namespace daal;
using namespace daal::algorithms;
using namespace daal::data_management;
using namespace std;

/* Input data set parameters */
string datasetFileName = "../data/batch/sorting.csv";

int main(int argc, char *argv[])
{
    checkArguments(argc, argv, 1, &datasetFileName);

    /* Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file */
    FileDataSource<CSVFeatureManager> dataSource(datasetFileName, DataSource::doAllocateNumericTable, DataSource::doDictionaryFromContext);

    /* Retrieve the data from the input file */
    dataSource.loadDataBlock();

    /* Create algorithm objects to sort data using the default (radix) method */
    sorting::Batch<> algorithm;

    /* Print the input observations matrix */
    printNumericTable(dataSource.getNumericTable(), "Initial matrix of observations:");

    /* Set input objects for the algorithm */
    algorithm.input.set(sorting::data, dataSource.getNumericTable());

    /* Sort data observations */
    algorithm.compute();

    /* Get the sorting result */
    services::SharedPtr<sorting::Result> res = algorithm.getResult();

    printNumericTable(res->get(sorting::sortedData), "Sorted matrix of observations:");

    return 0;
}

Results:

DAAL Sort Batch.

The data is sorted from smallest to largest per column.

Data Blocks:

dataSource.loadDataBlock(5);
//dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5);
DAAL Sort Block. DAAL Sort Block.

"Blocks" of data are being loaded 5 rows at a time. This allows us to easily section off data to process. This is also one way of distributing data to MPI etc.


Data Matrix Structure

Code:

/* file: datastructures_soa.cpp */
/*******************************************************************************
!  Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.


#include "daal.h"
#include "service.h"

using namespace daal;

const char *toString(DataFeatureUtils::FeatureType v);
const char *toString(DataFeatureUtils::InternalNumType v);

int main()
{
    std::cout << "Structure of array (SOA) numeric table example" << std::endl << std::endl;

    const size_t firstReadRow = 0;
    const size_t nRead = 4;
    size_t readFeatureIdx;

    /*Example of using an SOA numeric table*/
    const size_t nObservations = 10;
    const size_t nFeatures = 4;
    double dDataSOA[nObservations] = {1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8};
    float  fDataSOA[nObservations] = {3.1f, 3.2f, 3.3f, 3.4f, 3.5f, 3.6f, 3.7f, 3.8f, 3.9f, 4.0f};
    int    iDataSOA[nObservations] = { -10, -20, -30, -40, -50, -60, -70, -80, -90, -100};
    int    cDataSOA[nObservations] = {1, 2, 3, 4, 5, 1, 2, 3, 4, 5};

    /* Construct an SOA numeric table with nObservations rows and nFeatures columns */
    SOANumericTable dataTable(nFeatures, nObservations);
    dataTable.setArray<int>   (cDataSOA, 0);
    dataTable.setArray<float> (fDataSOA, 1);
    dataTable.setArray<double>(dDataSOA, 2);
    dataTable.setArray<int>   (iDataSOA, 3);

    /* Read a block of rows */
    BlockDescriptor<double> doubleBlock;
    dataTable.getBlockOfRows(firstReadRow, nRead, readOnly, doubleBlock);
    printArray<double>(doubleBlock.getBlockPtr(), nFeatures, doubleBlock.getNumberOfRows(), "Print SOA data structures as double:");
    dataTable.releaseBlockOfRows(doubleBlock);

    /* Read a feature (column) and write a new value into it */
    readFeatureIdx = 0;
    BlockDescriptor<int> intBlock;
    dataTable.getBlockOfColumnValues(readFeatureIdx, firstReadRow, nObservations, readOnly, intBlock);
    printArray<int>(intBlock.getBlockPtr(), 1, intBlock.getNumberOfRows(), "Print the first feature of SOA:");
    dataTable.releaseBlockOfColumnValues(intBlock);

    /* Get the dictionary and the number of features */
    NumericTableDictionary *pDictionary = dataTable.getDictionary();
    std::cout << "Number of features in table: " << pDictionary->getNumberOfFeatures() << std::endl;
    std::cout << std::endl;


	system("pause");
    return 0;
}

Struct1.PNG


Compressor

Code:

/* file: compressor.cpp */
/*******************************************************************************
!  Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.
!
!  The source code, information  and  material ("Material") contained herein is
!  owned  by Intel Corporation or its suppliers or licensors, and title to such
!  Material remains  with Intel Corporation  or its suppliers or licensors. The
!  Material  contains proprietary information  of  Intel or  its  suppliers and
!  licensors. The  Material is protected by worldwide copyright laws and treaty
!  provisions. No  part  of  the  Material  may  be  used,  copied, reproduced,
!  modified, published, uploaded, posted, transmitted, distributed or disclosed
!  in any way  without Intel's  prior  express written  permission. No  license
!  under  any patent, copyright  or  other intellectual property rights  in the
!  Material  is  granted  to  or  conferred  upon  you,  either  expressly,  by
!  implication, inducement,  estoppel or  otherwise.  Any  license  under  such
!  intellectual  property  rights must  be express  and  approved  by  Intel in
!  writing.
!
!  *Third Party trademarks are the property of their respective owners.
!
!  Unless otherwise  agreed  by Intel  in writing, you may not remove  or alter
!  this  notice or  any other notice embedded  in Materials by Intel or Intel's
!  suppliers or licensors in any way.
!
!*******************************************************************************
!  Content:
!    C++ example of a compressor
!
!******************************************************************************/

/**
 * <a name="DAAL-EXAMPLE-CPP-COMPRESSOR"></a>
 * \example compressor.cpp
 */

#include "daal.h"
#include "service.h"

using namespace std;
using namespace daal;
using namespace data_management;

string datasetFileName  = "../data/batch/logitboost_train.csv";

DataBlock sentDataStream;        /* Data stream to compress and send */
DataBlock uncompressedDataBlock; /* Current block of data stream to compress */
DataBlock compressedDataBlock;   /* Current compressed block of data */
DataBlock receivedDataStream;    /* Received uncompressed data stream */

queue<DataBlock> sendReceiveQueue; /* Queue for sending and receiving compressed data blocks */

const size_t maxDataBlockSize = 16384; /* Maximum size of a data block */

bool getUncompressedDataBlock(DataBlock &block);
void sendCompressedDataBlock(DataBlock &block);
bool receiveCompressedDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();

int main(int argc, char *argv[])
{
    checkArguments(argc, argv, 1, &datasetFileName);

    /* Read data from a file and allocate memory */
    prepareMemory();

    /* Create a compressor */
    Compressor<zlib> compressor;

    /* Receive the next data block for compression */
    while(getUncompressedDataBlock(uncompressedDataBlock))
    {
        /* Associate data to compress with the compressor */
        compressor.setInputDataBlock(uncompressedDataBlock);

        /* Memory for a compressed block might not be enough to compress the input block at once */
        do
        {
            /* Compress uncompressedDataBlock to compressedDataBlock */
            compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);

            /* Get the actual size of a compressed block */
            compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());

            /* Send the current compressed block */
            sendCompressedDataBlock(compressedDataBlock);
        }
        /* Check if an additional data block is needed to complete compression */
        while (compressor.isOutputDataBlockFull());
    }

    /* Create a decompressor */
    Decompressor<zlib> decompressor;

    /* Receive compressed data by blocks */
    while(receiveCompressedDataBlock(compressedDataBlock))
    {
        /* Associate compressed data with the decompressor */
        decompressor.setInputDataBlock(compressedDataBlock);

        /* Decompress an incoming block to the end of receivedDataStream */
        decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());

        /* Update the size of actual data in receivedDataStream */
        receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize());
    }

    /* Compute and print checksums for sentDataStream and receivedDataStream */
    printCRC32();

    releaseMemory();

    return 0;
}

void prepareMemory()
{
    /* Allocate sentDataStream and read an input file */
    byte *data;
    sentDataStream.setSize(readTextFile(datasetFileName, &data));
    sentDataStream.setPtr(data);

    byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize);
    checkAllocation(compressedData);
    compressedDataBlock.setPtr(compressedData);
    compressedDataBlock.setSize(maxDataBlockSize);

    byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
    checkAllocation(receivedData);
    receivedDataStream.setPtr(receivedData);
}

bool getUncompressedDataBlock(DataBlock &block)
{
    static size_t availableDataSize = sentDataStream.getSize();

    /* Calculate the current block size and ptr */
    if(availableDataSize >= maxDataBlockSize)
    {
        block.setSize(maxDataBlockSize);
        block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
        availableDataSize -= maxDataBlockSize;
    }
    else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
    {
        block.setSize(availableDataSize);
        block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
        availableDataSize = 0;
    }
    else
    {
        return false;
    }

    return true;
}

void sendCompressedDataBlock(DataBlock &block)
{
    DataBlock currentBlock;

    /* Allocate memory for the current compressed block in the queue */
    byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize());
    checkAllocation(currentPtr);
    currentBlock.setPtr(currentPtr);

    /* Copy an incoming block to the current compressed block */
    currentBlock.setSize(block.getSize());
    copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());

    /* Push the current compressed block to the queue */
    sendReceiveQueue.push(currentBlock);

    return;
}

bool receiveCompressedDataBlock(DataBlock &block)
{
    DataBlock currentBlock;

    /* Stop at the end of the queue */
    if(sendReceiveQueue.empty())
    {
        return false;
    }

    /* Receive the current compressed block from the queue */
    currentBlock = sendReceiveQueue.front();
    block.setSize(currentBlock.getSize());
    copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());

    /* Release memory of the current compressed block in the queue */
    daal::services::daal_free(currentBlock.getPtr());

    sendReceiveQueue.pop();

    return true;
}

void printCRC32()
{
    unsigned int crcSentDataStream = 0;
    unsigned int crcReceivedDataStream = 0;

    /* Compute checksums for full input data and full received data */
    crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
    crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());

    cout << endl << "Compression example program results:" << endl << endl;

    cout << "Input data checksum:    0x" << hex << crcSentDataStream << endl;
    cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;

    if (sentDataStream.getSize() != receivedDataStream.getSize())
    {
        cout << "ERROR: Received data size mismatches with the sent data size" << endl;
    }
    else if (crcSentDataStream != crcReceivedDataStream)
    {
        cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
    }
    else
    {
        cout << "OK: Received data CRC matches with the sent data CRC" << endl;
    }
}

void releaseMemory()
{
    if(compressedDataBlock.getPtr())
    {
        daal::services::daal_free(compressedDataBlock.getPtr());
    }
    if(receivedDataStream.getPtr())
    {
        daal::services::daal_free(receivedDataStream.getPtr());
    }
    if(sentDataStream.getPtr())
    {
        delete [] sentDataStream.getPtr();
    }
}

DAAL Compressor


Useful Links

  1. https://software.intel.com/en-us/daal
  2. Sorting: https://software.intel.com/en-us/node/610127
  3. Data Structure: https://software.intel.com/en-us/node/599565
  4. Compressor: https://software.intel.com/en-us/node/599552
  5. DAAL Guide Book: https://software.intel.com/en-us/daal-programming-guide