SLEEPy
GPU621/DPS921 | Participants | Groups and Projects | Resources | Glossary
Contents
Intel Data Analytics Acceleration Library (DAAL)
Team Member
Intro OLD
Local DAAL Examples Location: C:\Program Files (x86)\IntelSWTools\compilers_and_libraries_2016\windows\daal\examples
Data: http://open.canada.ca/data/en/dataset/cad804cd-454e-4bd7-9f22-fcee64f60719
New Data: http://open.canada.ca/data/en/dataset/be3880f2-0d04-4583-8265-611b231ebce8
Parser code: https://software.intel.com/en-us/node/610127
Low Order Moments: https://software.intel.com/en-us/node/599561
Compressor: https://software.intel.com/en-us/node/599552
Our goal is to parse & process this crime data and to add more meaning to said data. Using various parallel techniques taught in the course and comparing them via the DAAL library.
Introduction
What is Big Data
Big Data is data that is so big that traditional methods of data processing fail to keep up. A lot of the time this type of data is related to human behavior measurable by computers. This maybe trends, web analytics etc. With so many people using technology, data gathered this way is becoming massive. This is one of the reasons for the rise of Big Data.
What is DAAL
DAAL is a C++ & Java / Scala library for data analytics. It's similar to MKL(Math Kernel Library) with some differences:
- MKL focuses on computation. DAAL focuses on the entire data flow (aquisition, transformation, processing).
- Optimized for all kinds of Intel based devices (from data center to home computers)
DAAL supports 3 processing modes
- Offline Processing (Batch) - Data can fit in memory, data can be processed all at once.
- Online Processing (Streaming) - Data is too big for memory, DAAL processes the data in chunks and combine the partial results for the final result.
- Distributed processing - Distributes data processing. DAAL has not bound the communication method and leaves it to the developer (Hadoop, Spark, MPI etc).
How DAAL Works
Code Examples
Batch Sorting
CSV Data:
-55.558252,63.051427,-27.793776, -75.622534,61.212279,-16.283311, -86.747071,-28.132241,-17.824316, -34.172101,-51.404172,14.670925, -61.506308,48.248030,-99.235341, 9.746765,-89.879258,55.561778, 48.896723,-32.648097,48.313603, -15.346015,9.769776,-33.483281, 56.726081,-87.272631,8.724224, -1.926802,54.960580,-78.723429, 45.237223,-79.764218,-47.271926, 84.138339,11.547818,-92.962952, 46.711824,-42.623510,-34.664673, 55.813112,19.803475,4.807766, -55.474098,-72.163755,89.425736, -7.566596,-77.829218,58.630172, -76.081937,-12.089445,-44.065054, -25.888944,46.425499,-37.515164, -30.201387,-16.237217,-50.716591, -88.085869,60.136249,54.812866
Code:
/* file: sorting_batch.cpp
* Copyright 2014-2016 Intel Corporation All Rights Reserved.*/
#include "daal.h"
#include "service.h"
using namespace daal;
using namespace daal::algorithms;
using namespace daal::data_management;
using namespace std;
/* Input data set parameters */
string datasetFileName = "../data/batch/sorting.csv";
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
/* Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file */
FileDataSource<CSVFeatureManager> dataSource(datasetFileName, DataSource::doAllocateNumericTable, DataSource::doDictionaryFromContext);
/* Retrieve the data from the input file */
dataSource.loadDataBlock();
/* Create algorithm objects to sort data using the default (radix) method */
sorting::Batch<> algorithm;
/* Print the input observations matrix */
printNumericTable(dataSource.getNumericTable(), "Initial matrix of observations:");
/* Set input objects for the algorithm */
algorithm.input.set(sorting::data, dataSource.getNumericTable());
/* Sort data observations */
algorithm.compute();
/* Get the sorting result */
services::SharedPtr<sorting::Result> res = algorithm.getResult();
printNumericTable(res->get(sorting::sortedData), "Sorted matrix of observations:");
return 0;
}
Results:
The data is sorted from smallest to largest per column.
Data Blocks:
dataSource.loadDataBlock(5);
//dataSource.loadDataBlock(5); |
dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5); |
"Blocks" of data are being loaded 5 rows at a time. This allows us to easily section off data to process. This is also one way of distributing data to MPI etc.
Data Matrix Structure
Code:
/* file: datastructures_soa.cpp */
/*******************************************************************************
! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.
#include "daal.h"
#include "service.h"
using namespace daal;
const char *toString(DataFeatureUtils::FeatureType v);
const char *toString(DataFeatureUtils::InternalNumType v);
int main()
{
std::cout << "Structure of array (SOA) numeric table example" << std::endl << std::endl;
const size_t firstReadRow = 0;
const size_t nRead = 4;
size_t readFeatureIdx;
/*Example of using an SOA numeric table*/
const size_t nObservations = 10;
const size_t nFeatures = 4;
double dDataSOA[nObservations] = {1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8};
float fDataSOA[nObservations] = {3.1f, 3.2f, 3.3f, 3.4f, 3.5f, 3.6f, 3.7f, 3.8f, 3.9f, 4.0f};
int iDataSOA[nObservations] = { -10, -20, -30, -40, -50, -60, -70, -80, -90, -100};
int cDataSOA[nObservations] = {1, 2, 3, 4, 5, 1, 2, 3, 4, 5};
/* Construct an SOA numeric table with nObservations rows and nFeatures columns */
SOANumericTable dataTable(nFeatures, nObservations);
dataTable.setArray<int> (cDataSOA, 0);
dataTable.setArray<float> (fDataSOA, 1);
dataTable.setArray<double>(dDataSOA, 2);
dataTable.setArray<int> (iDataSOA, 3);
/* Read a block of rows */
BlockDescriptor<double> doubleBlock;
dataTable.getBlockOfRows(firstReadRow, nRead, readOnly, doubleBlock);
printArray<double>(doubleBlock.getBlockPtr(), nFeatures, doubleBlock.getNumberOfRows(), "Print SOA data structures as double:");
dataTable.releaseBlockOfRows(doubleBlock);
/* Read a feature (column) and write a new value into it */
readFeatureIdx = 0;
BlockDescriptor<int> intBlock;
dataTable.getBlockOfColumnValues(readFeatureIdx, firstReadRow, nObservations, readOnly, intBlock);
printArray<int>(intBlock.getBlockPtr(), 1, intBlock.getNumberOfRows(), "Print the first feature of SOA:");
dataTable.releaseBlockOfColumnValues(intBlock);
/* Get the dictionary and the number of features */
NumericTableDictionary *pDictionary = dataTable.getDictionary();
std::cout << "Number of features in table: " << pDictionary->getNumberOfFeatures() << std::endl;
std::cout << std::endl;
system("pause");
return 0;
}
Compressor
Code:
/* file: compressor.cpp */
/*******************************************************************************
! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.
!
! The source code, information and material ("Material") contained herein is
! owned by Intel Corporation or its suppliers or licensors, and title to such
! Material remains with Intel Corporation or its suppliers or licensors. The
! Material contains proprietary information of Intel or its suppliers and
! licensors. The Material is protected by worldwide copyright laws and treaty
! provisions. No part of the Material may be used, copied, reproduced,
! modified, published, uploaded, posted, transmitted, distributed or disclosed
! in any way without Intel's prior express written permission. No license
! under any patent, copyright or other intellectual property rights in the
! Material is granted to or conferred upon you, either expressly, by
! implication, inducement, estoppel or otherwise. Any license under such
! intellectual property rights must be express and approved by Intel in
! writing.
!
! *Third Party trademarks are the property of their respective owners.
!
! Unless otherwise agreed by Intel in writing, you may not remove or alter
! this notice or any other notice embedded in Materials by Intel or Intel's
! suppliers or licensors in any way.
!
!*******************************************************************************
! Content:
! C++ example of a compressor
!
!******************************************************************************/
/**
* <a name="DAAL-EXAMPLE-CPP-COMPRESSOR"></a>
* \example compressor.cpp
*/
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace data_management;
string datasetFileName = "../data/batch/logitboost_train.csv";
DataBlock sentDataStream; /* Data stream to compress and send */
DataBlock uncompressedDataBlock; /* Current block of data stream to compress */
DataBlock compressedDataBlock; /* Current compressed block of data */
DataBlock receivedDataStream; /* Received uncompressed data stream */
queue<DataBlock> sendReceiveQueue; /* Queue for sending and receiving compressed data blocks */
const size_t maxDataBlockSize = 16384; /* Maximum size of a data block */
bool getUncompressedDataBlock(DataBlock &block);
void sendCompressedDataBlock(DataBlock &block);
bool receiveCompressedDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
/* Read data from a file and allocate memory */
prepareMemory();
/* Create a compressor */
Compressor<zlib> compressor;
/* Receive the next data block for compression */
while(getUncompressedDataBlock(uncompressedDataBlock))
{
/* Associate data to compress with the compressor */
compressor.setInputDataBlock(uncompressedDataBlock);
/* Memory for a compressed block might not be enough to compress the input block at once */
do
{
/* Compress uncompressedDataBlock to compressedDataBlock */
compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);
/* Get the actual size of a compressed block */
compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());
/* Send the current compressed block */
sendCompressedDataBlock(compressedDataBlock);
}
/* Check if an additional data block is needed to complete compression */
while (compressor.isOutputDataBlockFull());
}
/* Create a decompressor */
Decompressor<zlib> decompressor;
/* Receive compressed data by blocks */
while(receiveCompressedDataBlock(compressedDataBlock))
{
/* Associate compressed data with the decompressor */
decompressor.setInputDataBlock(compressedDataBlock);
/* Decompress an incoming block to the end of receivedDataStream */
decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());
/* Update the size of actual data in receivedDataStream */
receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize());
}
/* Compute and print checksums for sentDataStream and receivedDataStream */
printCRC32();
releaseMemory();
return 0;
}
void prepareMemory()
{
/* Allocate sentDataStream and read an input file */
byte *data;
sentDataStream.setSize(readTextFile(datasetFileName, &data));
sentDataStream.setPtr(data);
byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize);
checkAllocation(compressedData);
compressedDataBlock.setPtr(compressedData);
compressedDataBlock.setSize(maxDataBlockSize);
byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
checkAllocation(receivedData);
receivedDataStream.setPtr(receivedData);
}
bool getUncompressedDataBlock(DataBlock &block)
{
static size_t availableDataSize = sentDataStream.getSize();
/* Calculate the current block size and ptr */
if(availableDataSize >= maxDataBlockSize)
{
block.setSize(maxDataBlockSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize -= maxDataBlockSize;
}
else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
{
block.setSize(availableDataSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize = 0;
}
else
{
return false;
}
return true;
}
void sendCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
/* Allocate memory for the current compressed block in the queue */
byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize());
checkAllocation(currentPtr);
currentBlock.setPtr(currentPtr);
/* Copy an incoming block to the current compressed block */
currentBlock.setSize(block.getSize());
copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());
/* Push the current compressed block to the queue */
sendReceiveQueue.push(currentBlock);
return;
}
bool receiveCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
/* Stop at the end of the queue */
if(sendReceiveQueue.empty())
{
return false;
}
/* Receive the current compressed block from the queue */
currentBlock = sendReceiveQueue.front();
block.setSize(currentBlock.getSize());
copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());
/* Release memory of the current compressed block in the queue */
daal::services::daal_free(currentBlock.getPtr());
sendReceiveQueue.pop();
return true;
}
void printCRC32()
{
unsigned int crcSentDataStream = 0;
unsigned int crcReceivedDataStream = 0;
/* Compute checksums for full input data and full received data */
crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());
cout << endl << "Compression example program results:" << endl << endl;
cout << "Input data checksum: 0x" << hex << crcSentDataStream << endl;
cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;
if (sentDataStream.getSize() != receivedDataStream.getSize())
{
cout << "ERROR: Received data size mismatches with the sent data size" << endl;
}
else if (crcSentDataStream != crcReceivedDataStream)
{
cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
}
else
{
cout << "OK: Received data CRC matches with the sent data CRC" << endl;
}
}
void releaseMemory()
{
if(compressedDataBlock.getPtr())
{
daal::services::daal_free(compressedDataBlock.getPtr());
}
if(receivedDataStream.getPtr())
{
daal::services::daal_free(receivedDataStream.getPtr());
}
if(sentDataStream.getPtr())
{
delete [] sentDataStream.getPtr();
}
}