SLEEPy
Intel Data Analytics Acceleration Library (DAAL)
Team Member
Introduction
What is Big Data
Big Data is data that is so big that traditional methods of data processing fail to keep up. A lot of the time this type of data is related to human behavior measurable by computers. This maybe trends, web analytics etc. With so many people using technology, data gathered this way is becoming massive. This is one of the reasons for the rise of Big Data.
What is DAAL
DAAL is a C++ & Java / Scala library for data analytics. It's similar to MKL(Math Kernel Library) with some differences:
- MKL focuses on computation. DAAL focuses on the entire data flow (aquisition, transformation, processing).
- Optimized for all kinds of Intel based devices (from data center to home computers)
- Vectorization method used for optimization
DAAL supports 3 processing modes
- Offline Processing (Batch) - Data can fit in memory, data can be processed all at once.
- Online Processing (Streaming) - Data is too big for memory, DAAL processes the data in chunks and combine the partial results for the final result.
- Distributed processing - Distributes data processing. DAAL has not bound the communication method and leaves it to the developer (Hadoop, Spark, MPI etc).
How DAAL Works
Installing DAAL
DAAL is already bundle with Intel® Parallel Studio XE 2016 Composer, Professional, or Cluster Edition, but you can also purchase a stand-alone version. To install it, you will have to execute the batch file to setup the environment.
By default, the batch to setup the environment is located at the <install dir> at C:\Program files (x86)\IntelSWTools\compilers_and_libraries_2016.x.xxx\windows.
Set Environment Variables
Run the <install dir>\daal\bin\daalvars.bat script as appropriate to your target architecture.
IA-32 architecture:
daalvars.bat ia32
Intel® 64 architecture:
daalvars.bat intel64
Compiling
Tells the compiler to link with standard threaded Intel DAAL:
/Qdaal or /Qdaal:parallel
Tells the compiler to link with sequential version of Intel DAAL:
/Qdaal:sequential
Visual Studio:
- In Solution Explorer, go to Project > Properties > Configuration Properties > Intel Performance Libraries.
- From the Use Intel DAAL drop-down menu, select the appropriate linking method. For example: Multi-threaded Static Library.
NOTE: DAAL may require you to also enable TBB.
Library Files
Single-threaded
(non-threaded) Intel DAAL |
Multi-threaded
(internally threaded) Intel DAAL | |
---|---|---|
Static linking |
daal_core.lib daal_sequential.lib |
daal_core.lib daal_thread.lib |
Dynamic linking |
daal_core_dll.lib |
daal_core_dll.lib |
Code Examples
Batch Sorting
CSV Data:
-55.558252,63.051427,-27.793776, -75.622534,61.212279,-16.283311, -86.747071,-28.132241,-17.824316, -34.172101,-51.404172,14.670925, -61.506308,48.248030,-99.235341, 9.746765,-89.879258,55.561778, 48.896723,-32.648097,48.313603, -15.346015,9.769776,-33.483281, 56.726081,-87.272631,8.724224, -1.926802,54.960580,-78.723429, 45.237223,-79.764218,-47.271926, 84.138339,11.547818,-92.962952, 46.711824,-42.623510,-34.664673, 55.813112,19.803475,4.807766, -55.474098,-72.163755,89.425736, -7.566596,-77.829218,58.630172, -76.081937,-12.089445,-44.065054, -25.888944,46.425499,-37.515164, -30.201387,-16.237217,-50.716591, -88.085869,60.136249,54.812866
Code:
/* file: sorting_batch.cpp
* Copyright 2014-2016 Intel Corporation All Rights Reserved.*/
#include "daal.h"
#include "service.h"
using namespace daal;
using namespace daal::algorithms;
using namespace daal::data_management;
using namespace std;
/* Input data set parameters */
string datasetFileName = "../data/batch/sorting.csv";
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
/* Initialize FileDataSource<CSVFeatureManager> to retrieve the input data from a .csv file */
FileDataSource<CSVFeatureManager> dataSource(datasetFileName, DataSource::doAllocateNumericTable, DataSource::doDictionaryFromContext);
/* Retrieve the data from the input file */
dataSource.loadDataBlock();
/* Create algorithm objects to sort data using the default (radix) method */
sorting::Batch<> algorithm;
/* Print the input observations matrix */
printNumericTable(dataSource.getNumericTable(), "Initial matrix of observations:");
/* Set input objects for the algorithm */
algorithm.input.set(sorting::data, dataSource.getNumericTable());
/* Sort data observations */
algorithm.compute();
/* Get the sorting result */
services::SharedPtr<sorting::Result> res = algorithm.getResult();
printNumericTable(res->get(sorting::sortedData), "Sorted matrix of observations:");
return 0;
}
Results:
The data is sorted from smallest to largest per column.
Data Blocks:
dataSource.loadDataBlock(5);
//dataSource.loadDataBlock(5); |
dataSource.loadDataBlock(5);
dataSource.loadDataBlock(5); |
"Blocks" of data are being loaded 5 rows at a time. This allows us to easily section off data to process. This is also one way of distributing data to MPI etc.
Data Matrix Structure
Code:
/* file: datastructures_soa.cpp */
/*******************************************************************************
! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.
#include "daal.h"
#include "service.h"
using namespace daal;
const char *toString(DataFeatureUtils::FeatureType v);
const char *toString(DataFeatureUtils::InternalNumType v);
int main()
{
std::cout << "Structure of array (SOA) numeric table example" << std::endl << std::endl;
const size_t firstReadRow = 0;
const size_t nRead = 4;
size_t readFeatureIdx;
/*Example of using an SOA numeric table*/
const size_t nObservations = 10;
const size_t nFeatures = 4;
double dDataSOA[nObservations] = {1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8};
float fDataSOA[nObservations] = {3.1f, 3.2f, 3.3f, 3.4f, 3.5f, 3.6f, 3.7f, 3.8f, 3.9f, 4.0f};
int iDataSOA[nObservations] = { -10, -20, -30, -40, -50, -60, -70, -80, -90, -100};
int cDataSOA[nObservations] = {1, 2, 3, 4, 5, 1, 2, 3, 4, 5};
/* Construct an SOA numeric table with nObservations rows and nFeatures columns */
SOANumericTable dataTable(nFeatures, nObservations);
dataTable.setArray<int> (cDataSOA, 0);
dataTable.setArray<float> (fDataSOA, 1);
dataTable.setArray<double>(dDataSOA, 2);
dataTable.setArray<int> (iDataSOA, 3);
/* Read a block of rows */
BlockDescriptor<double> doubleBlock;
dataTable.getBlockOfRows(firstReadRow, nRead, readOnly, doubleBlock);
printArray<double>(doubleBlock.getBlockPtr(), nFeatures, doubleBlock.getNumberOfRows(), "Print SOA data structures as double:");
dataTable.releaseBlockOfRows(doubleBlock);
/* Read a feature (column) and write a new value into it */
readFeatureIdx = 0;
BlockDescriptor<int> intBlock;
dataTable.getBlockOfColumnValues(readFeatureIdx, firstReadRow, nObservations, readOnly, intBlock);
printArray<int>(intBlock.getBlockPtr(), 1, intBlock.getNumberOfRows(), "Print the first feature of SOA:");
dataTable.releaseBlockOfColumnValues(intBlock);
/* Get the dictionary and the number of features */
NumericTableDictionary *pDictionary = dataTable.getDictionary();
std::cout << "Number of features in table: " << pDictionary->getNumberOfFeatures() << std::endl;
std::cout << std::endl;
system("pause");
return 0;
}
Compressor
Code:
/* file: compressor.cpp */
/*******************************************************************************
! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.
#include "daal.h"
#include "service.h"
using namespace std;
using namespace daal;
using namespace data_management;
string datasetFileName = "../data/batch/logitboost_train.csv";
DataBlock sentDataStream; /* Data stream to compress and send */
DataBlock uncompressedDataBlock; /* Current block of data stream to compress */
DataBlock compressedDataBlock; /* Current compressed block of data */
DataBlock receivedDataStream; /* Received uncompressed data stream */
queue<DataBlock> sendReceiveQueue; /* Queue for sending and receiving compressed data blocks */
const size_t maxDataBlockSize = 16384; /* Maximum size of a data block */
bool getUncompressedDataBlock(DataBlock &block);
void sendCompressedDataBlock(DataBlock &block);
bool receiveCompressedDataBlock(DataBlock &block);
void prepareMemory();
void releaseMemory();
void printCRC32();
int main(int argc, char *argv[])
{
checkArguments(argc, argv, 1, &datasetFileName);
/* Read data from a file and allocate memory */
prepareMemory();
/* Create a compressor */
Compressor<zlib> compressor;
/* Receive the next data block for compression */
while(getUncompressedDataBlock(uncompressedDataBlock))
{
/* Associate data to compress with the compressor */
compressor.setInputDataBlock(uncompressedDataBlock);
/* Memory for a compressed block might not be enough to compress the input block at once */
do
{
/* Compress uncompressedDataBlock to compressedDataBlock */
compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);
/* Get the actual size of a compressed block */
compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());
/* Send the current compressed block */
sendCompressedDataBlock(compressedDataBlock);
}
/* Check if an additional data block is needed to complete compression */
while (compressor.isOutputDataBlockFull());
}
/* Create a decompressor */
Decompressor<zlib> decompressor;
/* Receive compressed data by blocks */
while(receiveCompressedDataBlock(compressedDataBlock))
{
/* Associate compressed data with the decompressor */
decompressor.setInputDataBlock(compressedDataBlock);
/* Decompress an incoming block to the end of receivedDataStream */
decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());
/* Update the size of actual data in receivedDataStream */
receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize());
}
/* Compute and print checksums for sentDataStream and receivedDataStream */
printCRC32();
releaseMemory();
return 0;
}
void prepareMemory()
{
/* Allocate sentDataStream and read an input file */
byte *data;
sentDataStream.setSize(readTextFile(datasetFileName, &data));
sentDataStream.setPtr(data);
byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize);
checkAllocation(compressedData);
compressedDataBlock.setPtr(compressedData);
compressedDataBlock.setSize(maxDataBlockSize);
byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize());
checkAllocation(receivedData);
receivedDataStream.setPtr(receivedData);
}
bool getUncompressedDataBlock(DataBlock &block)
{
static size_t availableDataSize = sentDataStream.getSize();
/* Calculate the current block size and ptr */
if(availableDataSize >= maxDataBlockSize)
{
block.setSize(maxDataBlockSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize -= maxDataBlockSize;
}
else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0))
{
block.setSize(availableDataSize);
block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize);
availableDataSize = 0;
}
else
{
return false;
}
return true;
}
void sendCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
/* Allocate memory for the current compressed block in the queue */
byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize());
checkAllocation(currentPtr);
currentBlock.setPtr(currentPtr);
/* Copy an incoming block to the current compressed block */
currentBlock.setSize(block.getSize());
copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());
/* Push the current compressed block to the queue */
sendReceiveQueue.push(currentBlock);
return;
}
bool receiveCompressedDataBlock(DataBlock &block)
{
DataBlock currentBlock;
/* Stop at the end of the queue */
if(sendReceiveQueue.empty())
{
return false;
}
/* Receive the current compressed block from the queue */
currentBlock = sendReceiveQueue.front();
block.setSize(currentBlock.getSize());
copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());
/* Release memory of the current compressed block in the queue */
daal::services::daal_free(currentBlock.getPtr());
sendReceiveQueue.pop();
return true;
}
void printCRC32()
{
unsigned int crcSentDataStream = 0;
unsigned int crcReceivedDataStream = 0;
/* Compute checksums for full input data and full received data */
crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize());
crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());
cout << endl << "Compression example program results:" << endl << endl;
cout << "Input data checksum: 0x" << hex << crcSentDataStream << endl;
cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;
if (sentDataStream.getSize() != receivedDataStream.getSize())
{
cout << "ERROR: Received data size mismatches with the sent data size" << endl;
}
else if (crcSentDataStream != crcReceivedDataStream)
{
cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl;
}
else
{
cout << "OK: Received data CRC matches with the sent data CRC" << endl;
}
}
void releaseMemory()
{
if(compressedDataBlock.getPtr())
{
daal::services::daal_free(compressedDataBlock.getPtr());
}
if(receivedDataStream.getPtr())
{
daal::services::daal_free(receivedDataStream.getPtr());
}
if(sentDataStream.getPtr())
{
delete [] sentDataStream.getPtr();
}
}
Useful Links
- https://software.intel.com/en-us/daal
- Sorting: https://software.intel.com/en-us/node/610127
- Data Structure: https://software.intel.com/en-us/node/599565
- Compressor: https://software.intel.com/en-us/node/599552
- DAAL Guide Book: https://software.intel.com/en-us/daal-programming-guide