Open main menu

CDOT Wiki β

Changes

SLEEPy

13,047 bytes added, 11:11, 12 April 2016
What is DAAL
# [mailto:wdelhia@senecacollege.ca?subject=dps921 Woodson Delhia]
== Intro OLD Introduction ==Local DAAL Examples Location: C:\Program Files (x86)\IntelSWTools\compilers_and_libraries_2016\windows\daal\examples=== What is Big Data ===Big Data: http://open.canadais data that is so big that traditional methods of data processing fail to keep up.ca/A lot of the time this type of data/en/dataset/cad804cd-454e-4bd7-9f22-fcee64f60719 New Data: http://openis related to human behavior measurable by computers.canadaThis maybe trends, web analytics etc.ca/With so many people using technology, data/en/dataset/be3880f2-0d04-4583-8265-611b231ebce8 Parser code: https://softwaregathered this way is becoming massive.intelThis is one of the reasons for the rise of Big Data.com/en-us/node/610127
Low Order Moments[[File: https://softwareViegas-UserActivityonWikipedia.gif|center|500px|Visualization of daily Wikipedia edits created by IBM.intelAt multiple terabytes in size, the text and images of Wikipedia are an example of big data.com/en-us/node/599561]]
Our goal === What is to parse & process this crime data and to add more meaning to said data. Using various parallel techniques taught in the course and comparing them via the DAAL library. == Introduction ==DAAL is a C++ & Java / Scala library for data analytics. It's similar to MKL (Math Kernel Library) with some differences:
* MKL focuses on computation. DAAL focuses on the entire data flow (aquisition, transformation, processing).
* Optimized for all kinds of Intel based devices (from data center to home computers)
* Vectorization method used for optimization
DAAL supports 3 processing modes
* Online Processing (Streaming) - Data is too big for memory, DAAL processes the data in chunks and combine the partial results for the final result.
* Distributed processing - Distributes data processing. DAAL has not bound the communication method and leaves it to the developer (Hadoop, Spark, MPI etc).
 
 
[[File:Daal-flow.png|center|alt=DAAL Data Flow.]]
 
 
 
 
 
[[Image:Graph.PNG|center]]
 
== How DAAL Works ==
 
[[Image:DaalModel.png|center|600px ]]
[[Image:DAALDataSet.PNG|center|450px ]]
 [[Image:DAALDataflow.PNG|center|600px ]] == Installing DAAL == DAAL is already bundle with Intel® Parallel Studio XE 2016 Composer, Professional, or Cluster Edition, but you can also purchase a stand-alone version. To install it, you will have to execute the batch file to setup the environment.  By default, the batch to setup the environment is located at the ''<install dir>'' at ''C:\Program files (x86)\IntelSWTools\compilers_and_libraries_2016.x.xxx\windows''.  <u>'''Set Environment Variables'''</u> Run the ''<install dir>\daal\bin\daalvars.bat'' script as appropriate to your target architecture. ''IA-32 architecture:''  daalvars.bat ia32 ''Intel® 64 architecture:''  daalvars.bat intel64  [[File:DAAL_Environment.png]]  <u>'''Compiling'''</u> Tells the compiler to link with standard threaded Intel DAAL:  /Qdaal or /Qdaal:parallel Tells the compiler to link with sequential version of Intel DAAL:  /Qdaal:sequential Visual Studio: # In Solution Explorer, go to '''Project''' > '''Properties''' > '''Configuration Properties''' > '''Intel Performance Libraries'''.# From the '''Use Intel DAAL''' drop-down menu, select the appropriate linking method. For example: ''Multi-threaded Static Library''.   '''''NOTE:''' DAAL may require you to also enable TBB. ''  [[File:DAAL_VS_Setup.png]]  <u>'''Library Files'''</u> {| class=="wikitable"! ! Single-threaded (non-threaded) Intel DAAL ! Multi-threaded (internally threaded) Intel DAAL |- | ''Static linking'' ||  daal_core.lib daal_sequential.lib || daal_core.lib daal_thread.lib |- | ''Dynamic linking'' ||  daal_core_dll.lib || daal_core_dll.lib |}
== Code Examples ==
=== Batch Sorting ===
CSV Data:
 
<syntaxhighlight lang="c" enclose="none">
-55.558252,63.051427,-27.793776,
-75.622534,61.212279,-16.283311,
-86.747071,-28.132241,-17.824316,
-34.172101,-51.404172,14.670925,
-61.506308,48.248030,-99.235341,
9.746765,-89.879258,55.561778,
48.896723,-32.648097,48.313603,
-15.346015,9.769776,-33.483281,
56.726081,-87.272631,8.724224,
-1.926802,54.960580,-78.723429,
45.237223,-79.764218,-47.271926,
84.138339,11.547818,-92.962952,
46.711824,-42.623510,-34.664673,
55.813112,19.803475,4.807766,
-55.474098,-72.163755,89.425736,
-7.566596,-77.829218,58.630172,
-76.081937,-12.089445,-44.065054,
-25.888944,46.425499,-37.515164,
-30.201387,-16.237217,-50.716591,
-88.085869,60.136249,54.812866
</syntaxhighlight>
 
Code:
<syntaxhighlight lang="c" line="1" >
/* file: sorting_batch.cpp
</syntaxhighlight>
Results: [[File:DAAL-Sort-Batch.PNG|Left|alt=DAAL Sort Batch.]] The data is sorted from smallest to largest per column. Data Blocks:{| |<syntaxhighlight lang="c" line="1">dataSource.loadDataBlock(5);//dataSource.loadDataBlock(5);</syntaxhighlight>|<syntaxhighlight lang="c" line="1">dataSource.loadDataBlock(5);dataSource.loadDataBlock(5);</syntaxhighlight>|-|[[File:DAAL-sort-blocks1.PNG|Center|alt=DAAL Sort Block.]]|[[File:DAAL-sort-blocks2.PNG|Center|alt=DAAL Sort Block.]]|} "Blocks" of data are being loaded 5 rows at a time. This allows us to easily section off data to process.This is also one way of distributing data to MPI etc.  === Data Matrix Structure ===Code:<syntaxhighlight lang="c" line="1" > /* file: datastructures_soa.cpp *//*******************************************************************************! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved.  #include "daal.h"#include "service.h" using namespace daal; const char *toString(DataFeatureUtils::FeatureType v);const char *toString(DataFeatureUtils::InternalNumType v); int main(){ std::cout << "Structure of array (SOA) numeric table example" << std::endl << std::endl;  const size_t firstReadRow = 0; const size_t nRead = 4; size_t readFeatureIdx;  /*Example of using an SOA numeric table*/ const size_t nObservations = 10; const size_t nFeatures = 4; double dDataSOA[nObservations] = {1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.2, 2.4, 2.6, 2.8}; float fDataSOA[nObservations] = {3.1f, 3.2f, 3.3f, 3.4f, 3.5f, 3.6f, 3.7f, 3.8f, 3.9f, 4.0f}; int iDataSOA[nObservations] = { -10, -20, -30, -40, -50, -60, -70, -80, -90, -100}; int cDataSOA[nObservations] = {1, 2, 3, 4, 5, 1, 2, 3, 4, 5};  /* Construct an SOA numeric table with nObservations rows and nFeatures columns */ SOANumericTable dataTable(nFeatures, nObservations); dataTable.setArray<int> (cDataSOA, 0); dataTable.setArray<float> (fDataSOA, 1); dataTable.setArray<double>(dDataSOA, 2); dataTable.setArray<int> (iDataSOA, 3);  /* Read a block of rows */ BlockDescriptor<double> doubleBlock; dataTable.getBlockOfRows(firstReadRow, nRead, readOnly, doubleBlock); printArray<double>(doubleBlock.getBlockPtr(), nFeatures, doubleBlock.getNumberOfRows(), "Print SOA data structures as double:"); dataTable.releaseBlockOfRows(doubleBlock);  /* Read a feature (column) and write a new value into it */ readFeatureIdx = 0; BlockDescriptor<int> intBlock; dataTable.getBlockOfColumnValues(readFeatureIdx, firstReadRow, nObservations, readOnly, intBlock); printArray<int>(intBlock.getBlockPtr(), 1, intBlock.getNumberOfRows(), "Print the first feature of SOA:"); dataTable.releaseBlockOfColumnValues(intBlock);  /* Get the dictionary and the number of features */ NumericTableDictionary *pDictionary = dataTable.getDictionary(); std::cout << "Number of features in table: " << pDictionary->getNumberOfFeatures() << std::endl; std::cout << std::endl;   system("pause"); return 0;} </syntaxhighlight>[[Image:Struct1.PNG|LEFT|600px| ]]  === Compressor ===Code:<syntaxhighlight lang="c" line="1" >/* file: compressor.cpp *//*******************************************************************************! Copyright(C) 2014-2015 Intel Corporation. All Rights Reserved. #include "daal.h"#include "service.h" using namespace std;using namespace daal;using namespace data_management; string datasetFileName = "../data/batch/logitboost_train.csv"; DataBlock sentDataStream; /* Data stream to compress and send */DataBlock uncompressedDataBlock; /* Current block of data stream to compress */DataBlock compressedDataBlock; /* Current compressed block of data */DataBlock receivedDataStream; /* Received uncompressed data stream */ queue<DataBlock> sendReceiveQueue; /* Queue for sending and receiving compressed data blocks */ const size_t maxDataBlockSize = 16384; /* Maximum size of a data block */ bool getUncompressedDataBlock(DataBlock &block);void sendCompressedDataBlock(DataBlock &block);bool receiveCompressedDataBlock(DataBlock &block);void prepareMemory();void releaseMemory();void printCRC32(); int main(int argc, char *argv[]){ checkArguments(argc, argv, 1, &datasetFileName);  /* Read data from a file and allocate memory */ prepareMemory();  /* Create a compressor */ Compressor<zlib> compressor;  /* Receive the next data block for compression */ while(getUncompressedDataBlock(uncompressedDataBlock)) { /* Associate data to compress with the compressor */ compressor.setInputDataBlock(uncompressedDataBlock);  /* Memory for a compressed block might not be enough to compress the input block at once */ do { /* Compress uncompressedDataBlock to compressedDataBlock */ compressor.run(compressedDataBlock.getPtr(), maxDataBlockSize, 0);  /* Get the actual size of a compressed block */ compressedDataBlock.setSize(compressor.getUsedOutputDataBlockSize());  /* Send the current compressed block */ sendCompressedDataBlock(compressedDataBlock); } /* Check if an additional data block is needed to complete compression */ while (compressor.isOutputDataBlockFull()); }  /* Create a decompressor */ Decompressor<zlib> decompressor;  /* Receive compressed data by blocks */ while(receiveCompressedDataBlock(compressedDataBlock)) { /* Associate compressed data with the decompressor */ decompressor.setInputDataBlock(compressedDataBlock);  /* Decompress an incoming block to the end of receivedDataStream */ decompressor.run(receivedDataStream.getPtr(), maxDataBlockSize, receivedDataStream.getSize());  /* Update the size of actual data in receivedDataStream */ receivedDataStream.setSize(receivedDataStream.getSize() + decompressor.getUsedOutputDataBlockSize()); }  /* Compute and print checksums for sentDataStream and receivedDataStream */ printCRC32();  releaseMemory();  return 0;} void prepareMemory(){ /* Allocate sentDataStream and read an input file */ byte *data; sentDataStream.setSize(readTextFile(datasetFileName, &data)); sentDataStream.setPtr(data);  byte *compressedData = (byte *)daal::services::daal_malloc(maxDataBlockSize); checkAllocation(compressedData); compressedDataBlock.setPtr(compressedData); compressedDataBlock.setSize(maxDataBlockSize);  byte *receivedData = (byte *)daal::services::daal_malloc(sentDataStream.getSize()); checkAllocation(receivedData); receivedDataStream.setPtr(receivedData);} bool getUncompressedDataBlock(DataBlock &block){ static size_t availableDataSize = sentDataStream.getSize();  /* Calculate the current block size and ptr */ if(availableDataSize >= maxDataBlockSize) { block.setSize(maxDataBlockSize); block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize); availableDataSize -= maxDataBlockSize; } else if((availableDataSize < maxDataBlockSize) && (availableDataSize > 0)) { block.setSize(availableDataSize); block.setPtr(sentDataStream.getPtr() + sentDataStream.getSize() - availableDataSize); availableDataSize = 0; } else { return false; }  return true;} void sendCompressedDataBlock(DataBlock &block){ DataBlock currentBlock;  /* Allocate memory for the current compressed block in the queue */ byte *currentPtr = (byte *)daal::services::daal_malloc(block.getSize()); checkAllocation(currentPtr); currentBlock.setPtr(currentPtr);  /* Copy an incoming block to the current compressed block */ currentBlock.setSize(block.getSize()); copyBytes(currentBlock.getPtr(), block.getPtr(), currentBlock.getSize());  /* Push the current compressed block to the queue */ sendReceiveQueue.push(currentBlock);  return;} bool receiveCompressedDataBlock(DataBlock &block){ DataBlock currentBlock;  /* Stop at the end of the queue */ if(sendReceiveQueue.empty()) { return false; }  /* Receive the current compressed block from the queue */ currentBlock = sendReceiveQueue.front(); block.setSize(currentBlock.getSize()); copyBytes(block.getPtr(), currentBlock.getPtr(), block.getSize());  /* Release memory of the current compressed block in the queue */ daal::services::daal_free(currentBlock.getPtr());  sendReceiveQueue.pop();  return true;} void printCRC32(){ unsigned int crcSentDataStream = 0; unsigned int crcReceivedDataStream = 0;  /* Compute checksums for full input data and full received data */ crcSentDataStream = getCRC32(sentDataStream.getPtr(), crcSentDataStream, sentDataStream.getSize()); crcReceivedDataStream = getCRC32(receivedDataStream.getPtr(), crcReceivedDataStream, receivedDataStream.getSize());  cout << endl << "Compression example program results:" << endl << endl;  cout << "Input data checksum: 0x" << hex << crcSentDataStream << endl; cout << "Received data checksum: 0x" << hex << crcReceivedDataStream << endl;  if (sentDataStream.getSize() != receivedDataStream.getSize()) { cout << "ERROR: Received data size mismatches with the sent data size" << endl; } else if (crcSentDataStream != crcReceivedDataStream) { cout << "ERROR: Received data CRC mismatches with the sent data CRC" << endl; } else { cout << "OK: Received data CRC matches with the sent data CRC" << endl; }} void releaseMemory(){ if(compressedDataBlock.getPtr()) { daal::services::daal_free(compressedDataBlock.getPtr()); } if(receivedDataStream.getPtr()) { daal::services::daal_free(receivedDataStream.getPtr()); } if(sentDataStream.getPtr()) { delete [] sentDataStream.getPtr(); }}</syntaxhighlight> [[Image:DAAL_Compressor.png|Left|DAAL Compressor]]  == Useful Link Links ==
# https://software.intel.com/en-us/daal
# Sorting: https://software.intel.com/en-us/node/610127
# Data Structure: https://software.intel.com/en-us/node/599565
# Compressor: https://software.intel.com/en-us/node/599552
# DAAL Guide Book: https://software.intel.com/en-us/daal-programming-guide