Code:
#include <iostream>#include <fstream>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <cmath>
#include <ctime>
#include <iomanip>
#include <vector>
#include <string>
#include <pthread.h>
using namespace std;
struct Date
{
int day; //1-30
int month; //1-12
int year; //06
};
struct Item
{
Date SalesDate;
int storeID; //1-p
int registerNum; //1-10
double saleAmount; // 0.50 – 999.99
};
void *produce(void *threadid); //function for producer processes
void *consume(void *threadid); //function for consumer processes
void cwait(int& S, int threadid, vector<int>& waiting, double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[]); //wait semaphore function for consumers
void pwait(int& S, int threadid, vector<int>& waiting); //wait semaphore function for producers
void signal(int& S); //signal semaphore function
void printStatistics(double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[], int threadid); //function where consumers print local statistics
Item produceItem(void *t); //function for production of new items with random values
//SHARED VARIABLES
int numOfRecords, numOfRecordsRead, in, out; //variables for counting records produced, counting records consumed, and determining where to add or remove items from the buffer
int p, c, b; //number of producers, consumers, and buffer size
int empty, full, mutex; //semaphores
vector<int> ewaiting; //waiting queue for empty semaphore
vector<int> fwaiting; //waiting queue for full semaphore
vector<int> mwaiting; //waiting queue for mutex semaphore
int consumersDone; //flag vector for indicating when producer/consumer threads are finished executing
vector<Item> buffer;
vector<double> Store_Wide; //Store-wide total sales
double Month_Wise[12]; //Month-wise total sales (in all stores)
double Aggregate; // Aggregate sales (all sales together)
double Time; //Total time for simulation (from begin to end)
int main(int argc, char** argv)
{
//start time
srand(time(NULL));
clock_t start, stop;
start = clock() + CLK_TCK;
ifstream inFile;
string fileName = argv[1];
inFile.open(fileName.c_str());
inFile >> p >> c >> b;
inFile.close();
// BEGIN INITIALIZATIONS
Item emptyItem;
emptyItem.SalesDate.day = 0;
emptyItem.SalesDate.month = 0;
emptyItem.SalesDate.year = 0;
emptyItem.storeID = 0;
emptyItem.registerNum = 0;
emptyItem.saleAmount = 0;
for (int i = 0; i < b; i++)
{
buffer.push_back(emptyItem);
}
bool allConsumersDone = false;
numOfRecords = 0;
numOfRecordsRead = 0;
in = 0;
out = 0;
empty = b;
full = 0;
mutex = 1;
Aggregate = 0;
consumersDone = 0;
for (int i = 0; i < p; i++)
{
Store_Wide.push_back(0);
}
for (int i = 0; i < 12; i++)
{
Month_Wise[i] = 0;
}
// END INITIALIZATIONS
//BEGIN THREAD CREATION
pthread_t p_threads[p], c_threads[c];
int rc, t;
for(t=1; t < (p+1); t++)
{
rc = pthread_create(&p_threads[t], NULL, produce, (void *)t);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
for(t=1+p; t < (c+p+1); t++)
{
rc = pthread_create(&c_threads[t], NULL, consume, (void *)t);
if (rc)
{
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(-1);
}
}
//END THREAD CREATION
while (!allConsumersDone) //main waits until producer/consumer threads are finished executing
{
if (consumersDone == c)
allConsumersDone = true;
};
//print global statistics and time
cout << "GLOBAL STATISTICS" << endl;
cout << "Global Aggregate: " << Aggregate << endl;
cout << "Global Store Wide" << endl;
for (int i = 0; i < p; i++)
{
cout << "Store ID: " << (i + 1) << endl;
cout << "Global Sales: " << Store_Wide[i] << endl;
}
cout << "Global Month Wise" << endl;
for (int i = 0; i < 12; i++)
{
cout << "Month: " << (i + 1) << endl;
cout << "Global Sales: " << Month_Wise[i] << endl;
}
stop = clock()+CLK_TCK;
Time = ((double) stop-start)/1000;
cout << "Runtime: " << Time << " seconds" << endl << endl;
pthread_exit(NULL);
return 0;
}
void *produce(void *threadid) //production thread function
{
while (numOfRecords < 50)
{
Item nextProduced = produceItem(threadid); //produce item
pwait(empty, (int) threadid, ewaiting);
pwait(mutex, (int) threadid, mwaiting);
//add item to buffer
buffer[in] = nextProduced;
in = ( in + 1) % b;
numOfRecords++;
signal(mutex);
signal(full);
int sleepTime = rand() % 36 + 5;
// sleep(sleepTime); //sleep for 5-40 ms
}
pthread_exit(NULL);
}
void *consume(void *threadid) //consumption thread function
{
double l_Store_Wide[p]; //Store-wide total sales (LOCAL)
double l_Month_Wise[12]; //Month-wise total sales (in all stores) (LOCAL)
double l_Aggregate = 0; // Aggregate sales (all sales together) (LOCAL)
//initialize above variables
for (int i = 0; i < p; i++)
{
l_Store_Wide[i] = 0;
}
for (int i = 0; i < 12; i++)
{
l_Month_Wise[i] = 0;
}
while (numOfRecordsRead < 50)
{
cwait(full, (int) threadid, fwaiting, l_Aggregate, l_Store_Wide, l_Month_Wise);
cwait(mutex, (int) threadid, mwaiting, l_Aggregate, l_Store_Wide, l_Month_Wise);
//remove item from buffer to nextConsumed
Item nextConsumed = buffer[out];
//create blank item to replace consumed item in buffer
Item emptyItem;
emptyItem.SalesDate.day = 0;
emptyItem.SalesDate.month = 0;
emptyItem.SalesDate.year = 0;
emptyItem.storeID = 0;
emptyItem.registerNum = 0;
emptyItem.saleAmount = 0;
buffer[out] = emptyItem;
out = (out + 1) % b;
numOfRecordsRead++;
signal(mutex);
signal(empty);
//consume item in nextConsumed/ record statistics
l_Aggregate = l_Aggregate + nextConsumed.saleAmount;
l_Store_Wide[nextConsumed.storeID - 1] = l_Store_Wide[nextConsumed.storeID - 1] + nextConsumed.saleAmount;
l_Month_Wise[nextConsumed.SalesDate.month - 1] = l_Month_Wise[nextConsumed.SalesDate.month - 1] + nextConsumed.saleAmount;
}
//print local statistics
printStatistics(l_Aggregate, l_Store_Wide, l_Month_Wise, (int) threadid);
}
void cwait(int& S, int threadid, vector<int>& waiting, double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[])
{
waiting.push_back((int)threadid);
while (S <= 0 || waiting[0] != threadid)
{
if (numOfRecordsRead >= 50)
printStatistics(l_Aggregate, l_Store_Wide, l_Month_Wise, threadid); //print local statistics
};
S--;
waiting.erase(waiting.begin());
}
void pwait(int& S, int threadid, vector<int>& waiting)
{
waiting.push_back((int)threadid);
while (S <= 0 || waiting[0] != threadid)
{
if (numOfRecords >= 50)
pthread_exit(NULL); //terminate thread
};
S--;
waiting.erase(waiting.begin());
}
void signal(int& S)
{
S++;
}
Item produceItem(void *t)
{
Item newItem;
newItem.SalesDate.day = rand() % 30 + 1;
newItem.SalesDate.month = rand() % 12 + 1;
newItem.SalesDate.year = 6;
newItem.storeID = (int) t; //producerThreadID
newItem.registerNum = rand() % 10 + 1;
newItem.saleAmount = static_cast<double>( rand() ) * 999.49 / static_cast<double>( RAND_MAX ) + 0.5 ; // rand() * range / RAND_MAX + low
return newItem;
}
void printStatistics(double l_Aggregate, double l_Store_Wide[], double l_Month_Wise[], int threadid)
{
cout << "LOCAL STATISTICS" << endl;
cout << "Consumer ID: " << threadid << endl;
cout << "Local Aggregate: " << l_Aggregate << endl;
Aggregate = Aggregate + l_Aggregate;
cout << "Local Store Wide" << endl;
for (int i = 0; i < p; i++)
{
cout << "Store ID: " << (i + 1) << endl;
cout << "Local Sales: " << l_Store_Wide[i] << endl;
Store_Wide[i] = Store_Wide[i] + l_Store_Wide[i];
}
cout << "Local Month Wise" << endl;
for (int i = 0; i < 12; i++)
{
cout << "Month: " << (i + 1) << endl;
cout << "Local Sales: " << l_Month_Wise[i] << endl;
Month_Wise[i] = Month_Wise[i] + l_Month_Wise[i];
}
consumersDone++;
pthread_exit(NULL);
}