Thread: Multi-Threading and ADO

  1. #16
    Registered User
    Join Date
    Apr 2004
    Posts
    10
    Thanks for all your reply guys. I have modified the same code to run through till the recordsets are printed (basically will make calls later). now all my threads are able to access the recordsets, but still I get some unsatisfactory results.

    Code:
    #import "C:\Program files\Common Files\System\Ado\msado15.dll" no_namespace rename("EOF", "ADOEOF")
    #include <ole2.h>
    #include <stdio.h>
    #include <windows.h>
    #define NUM_THREADS 2
    
    CRITICAL_SECTION cs; 
    HRESULT hr;
    DWORD dwRsCookie;
    IGlobalInterfaceTable * g_pGIT;
    /*
       This function is used to make call in the future. for testing Iam 
       giving some delay here
    */
    int ProcessFunc(){
    	int a,b,j,k;
    	for(a=0;a<2000000;a++){
    		j=j++;
    		k=k++;
    	}
    	for(b=0;b<2000000;b++){
    		j=j--;
    		k=k--;
    	}
    
    	return 0;
    }
    
    /*
        ThreadFunc is responsible for accessing the Recordset. and moving through
    	recordset.
    */
    DWORD WINAPI ThreadFunc(LPVOID lpParam){
    	try{
    		printf("Thread started \n");
    		CoInitialize(NULL);
    		_RecordsetPtr pRs;
    
    		IGlobalInterfaceTable *lpGIT;
    		CoCreateInstance(CLSID_StdGlobalInterfaceTable, NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable, (void **)&lpGIT); 
    		lpGIT->GetInterfaceFromGlobal(dwRsCookie,__uuidof(_Recordset),(void**)&pRs);
    
    		if(pRs->ADOEOF){
    			printf("The Recordset Object is empty \n");
    			exit(0);
    		}
    		while(!pRs->ADOEOF){
    			EnterCriticalSection(&cs);
    			//_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    			ProcessFunc();
    			printf("%s \n",(char*)val);
    		}		
    		lpGIT->Release();
    		CoUninitialize();
    		
    	}catch(_com_error &e){
    		printf("\tCode = %08lx\n", e.Error());
    		printf("\tCode meaning = %s\n", e.ErrorMessage());
    		return 0;
    	}
    	return 0;
    }
    
    int main(){
    	DWORD dwThreadId;
    	HANDLE hThread[NUM_THREADS];
    	int n;    
    	
    	CoInitialize(NULL);
    	_ConnectionPtr pConn;
    	_RecordsetPtr pRs;
    
    	try{
    		hr = pConn.CreateInstance(__uuidof(Connection));
    		hr = pRs.CreateInstance(__uuidof(Recordset));
            _bstr_t strConn("Provider=sqloledb;server=ALBHM01WSINF005;Trusted_Connection=yes;database=Core;");
            pConn->Open(strConn,"","",adConnectUnspecified);
    		//pRs->Open("SELECT top 10 Npa_Num from Npa (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
            pRs->Open("SELECT BTN from mbs_GetSpitFireEasyPay (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
    
    	}catch(_com_error &e){
    		printf("Error\n");
    		printf("\tCode meaning = %s", e.ErrorMessage());
    	}
    	CoUninitialize();
    	IGlobalInterfaceTable *lpGIT;
    	CoCreateInstance(
    		CLSID_StdGlobalInterfaceTable, 
    		NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable,
    		(void **)&lpGIT); 
    	lpGIT->RegisterInterfaceInGlobal(pRs,__uuidof(_Recordset)  ,&dwRsCookie);
    	InitializeCriticalSection(&cs);
    
    	// create all the threads 
    	for (n = 0; n < NUM_THREADS; n++){
    		hThread[n] = CreateThread(NULL, 0, ThreadFunc, &pRs, 0, &dwThreadId);  
    		if (hThread == NULL)
    			fprintf(stderr, "Failed to create thread# %d", n);
    	}//for
    
    	for (n = 0; n < NUM_THREADS; n++){
            if (hThread[n] != NULL)
                WaitForSingleObject(hThread[n], INFINITE);
    	}//for*/
    	lpGIT->Release();
    	DeleteCriticalSection(&cs);
    	printf("This is after all threads exited\n"); 
    	return 0;
    }
    This works fine. I introduced a simple delay in ProcessFunc(). but the problem is sometimes I see some records are repeated eventhough I am moving my recordset objects in CriticalSection only. also sometimes I see some records are missed.

    for example if threads are less than recordsets and when introduced less delay, I see some recordsets repeated.

    If threads are more than the recordsets, I see few records are missing in the result.

    but with this code I can clearly see all the threads starting simultaneously move through recordsets and if one job is finished, the same thread gets the next job.

    please advise, how I can prevent the runtime error.

    would there be a problem in the recordset object I am passing?

    or do I need to introduce some WaitFunctions in my code, to avoid this

  2. #17
    Yes, my avatar is stolen anonytmouse's Avatar
    Join Date
    Dec 2002
    Posts
    2,544
    OK, first problem is that you are calling CoUninitialize() too early. You can only call CoUninitialize() after you have finished with all COM objects, typically it is placed just before the return statement.

    Secondly, I think you need to run a message loop, at least in your main thread, to allow COM internal messages to be dealt with.
    Change:
    Code:
    	for (n = 0; n < NUM_THREADS; n++){
            if (hThread[n] != NULL)
                WaitForSingleObject(hThread[n], INFINITE);
    	}//for*/
    to:
    Code:
    	for (n = 0; n < NUM_THREADS; n++)
    	{
    		if (hThread[n] != NULL)
    		{
    			while (MsgWaitForMultipleObjects(1, &hThread[n], TRUE,
    	                                                 INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1);
    			{
    				MSG msg;
    				while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
    				{
    					TranslateMessage(&msg);
    					DispatchMessage(&msg);
    				}
    			}
    		}
    	}
    This will pump the message loop while waiting for the threads to complete. Otherwise, figure out some other way to run a message loop that suits your architecture.

    I'm not sure that either of these issues are causing your problems but fixing them should be a start. Post the corrected code if the problems remain.

    EDIT:
    Thirdly, the EOF check needs to be in the critical section.

    Change:
    Code:
    		while(!pRs->ADOEOF){
    			EnterCriticalSection(&cs);
    			//_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    			ProcessFunc();
    			printf("%s \n",(char*)val);
    		}
    to:
    Code:
    		while(TRUE)
    		{
    			EnterCriticalSection(&cs);
    			if (pRs->ADOEOF) break;
    			//_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    			ProcessFunc();
    			printf("%s \n",(char*)val);
    		}
    This will avoid the following scenario:

    Thread A: Check EOF
    Thread B: Move Next
    Thread A: Get value - Oops, the action of thread B means we are now EOF.
    Last edited by anonytmouse; 05-12-2004 at 11:58 PM.

  3. #18
    Registered User
    Join Date
    Apr 2004
    Posts
    10
    anonytmouse,

    I made the modification suggested by you. I am now facing 2 problems

    1. sometimes I get only one record being printed twice (This happens first time when I build and atleast once in 10 attempts).

    2. The second problem is the main thread doesnt exits (it hangs on). I think my ThreadFunc is not exiting properly.

    before I used to do this before the while loop
    Code:
    if(pRs->ADOEOF){
    	printf("The Recordset Object is empty \n");
    	exit(0);
    }
    now after the modifications suggested by you my code looks like this

    Code:
    #import "C:\Program files\Common Files\System\Ado\msado15.dll" no_namespace rename("EOF", "ADOEOF")
    #include <ole2.h>
    #include <stdio.h>
    #include <windows.h>
    #define NUM_THREADS 48
    
    CRITICAL_SECTION cs; 
    HRESULT hr;
    DWORD dwRsCookie;
    IGlobalInterfaceTable * g_pGIT;
    /*
       This function is used to make call in the future. for testing Iam 
       giving some delay here
    */
    int ProcessFunc(){
    	int a,b,j,k;
    	for(a=0;a<2000;a++){
    		j=j++;
    		k=k++;
    	}
    	for(b=0;b<20000000;b++){
    		j=j--;
    		k=k--;
    	}
    
    	return 0;
    }
    
    /*
        ThreadFunc is responsible for accessing the Recordset. and moving through
    	recordset.
    */
    DWORD WINAPI ThreadFunc(LPVOID lpParam){
    	try{int i = 0;
    		printf("Thread started \n");
    		CoInitialize(NULL);
    		_RecordsetPtr pRs;
    
    		IGlobalInterfaceTable *lpGIT;
    		CoCreateInstance(CLSID_StdGlobalInterfaceTable, NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable, (void **)&lpGIT); 
    		lpGIT->GetInterfaceFromGlobal(dwRsCookie,__uuidof(_Recordset),(void**)&pRs);
    
    		while(TRUE){
    			EnterCriticalSection(&cs);
    			if (pRs->ADOEOF) break;
    			_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			//_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    			ProcessFunc();
    			printf("%d %s \n",i++ ,(char*)val);
    		}		
    		lpGIT->Release();
    		
    	}catch(_com_error &e){
    		printf("\tCode = %08lx\n", e.Error());
    		printf("\tCode meaning = %s\n", e.ErrorMessage());
    		return 0;
    	}
    	CoUninitialize();
    	return 0;
    }
    
    int main(){
    	DWORD dwThreadId;
        HANDLE hThread[NUM_THREADS];
        int n;    
    	
    	CoInitialize(NULL);
    	_ConnectionPtr pConn;
    	_RecordsetPtr pRs;
    
    	try{
    		hr = pConn.CreateInstance(__uuidof(Connection));
    		hr = pRs.CreateInstance(__uuidof(Recordset));
            _bstr_t strConn("Provider=sqloledb;server=ALBHM01WSINF005;Trusted_Connection=yes;database=Core;");
            pConn->Open(strConn,"","",adConnectUnspecified);
    		pRs->Open("SELECT top 25 Npa_Num from Npa (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
            //pRs->Open("SELECT BTN from mbs_GetSpitFireEasyPay (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
    
    	}catch(_com_error &e){
    		printf("Error\n");
    		printf("\tCode meaning = %s", e.ErrorMessage());
    	}
    	CoUninitialize();
    	IGlobalInterfaceTable *lpGIT;
    	CoCreateInstance(
    		CLSID_StdGlobalInterfaceTable, 
    		NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable,
    		(void **)&lpGIT); 
    	lpGIT->RegisterInterfaceInGlobal(pRs,__uuidof(_Recordset)  ,&dwRsCookie);
    	InitializeCriticalSection(&cs);
    
    	// create all the threads 
        for (n = 0; n < NUM_THREADS; n++){
            hThread[n] = CreateThread(NULL, 0, ThreadFunc, &pRs, 0, &dwThreadId);  
            if (hThread == NULL)
                fprintf(stderr, "Failed to create thread# %d", n);
        }//for
    	for (n = 0; n < NUM_THREADS; n++){
    		if (hThread[n] != NULL){
    			while (MsgWaitForMultipleObjects(1, &hThread[n], TRUE,INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1);{
    				MSG msg;
    				while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)){
    					TranslateMessage(&msg);
    					DispatchMessage(&msg);
    				}
    			}
    		}
    	}
    	lpGIT->Release();
    	DeleteCriticalSection(&cs);
    	printf("This is after all threads exited\n"); 
    	return 0;
    }
    for some reason sometimes there is a duplication in records. I dont know why my main thread is not exiting after printing all the records

  4. #19
    Yes, my avatar is stolen anonytmouse's Avatar
    Join Date
    Dec 2002
    Posts
    2,544
    You are still calling CoUninitialize() in the middle of your main thread.

    Delete it from here:
    Code:
    	}
    	CoUninitialize();
    	IGlobalInterfaceTable *lpGIT;
    and put it here:
    Code:
    	printf("This is after all threads exited\n"); 
    	CoUninitialize();
    	return 0;
    >> 2. The second problem is the main thread doesnt exits (it hangs on). <<

    Sorry, there was a mistake in the code I posted. You also have an extra semi-colon that shouldn't be there.

    Code:
    while (MsgWaitForMultipleObjects(1, &hThread[n], TRUE,INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1);{
    should be:
    Code:
    // Pass FALSE instead of TRUE and remove extra semi-colon.
    while (MsgWaitForMultipleObjects(1, &hThread[n], FALSE,INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1) {
    Also, we are not releasing the critical section if ADOEOF.

    Change:
    Code:
    			EnterCriticalSection(&cs);
    			if (pRs->ADOEOF) break;
    			_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			//_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    to:
    Code:
    			EnterCriticalSection(&cs);
    			if (pRs->ADOEOF)
    			{
    				LeaveCriticalSection(&cs);
    				break;
    			}
    			_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			//_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    >> 1. sometimes I get only one record being printed twice (This happens first time when I build and atleast once in 10 attempts). <<

    See if the above fixes remove the problem. If it still occurs is there any pattern? Is it always the first or last records, or is it an arbitary record that gets repeated?

    EDIT:
    Here is the code with the above corrections incorporated:
    Code:
    #import "C:\Program files\Common Files\System\Ado\msado15.dll" no_namespace rename("EOF", "ADOEOF")
    #include <ole2.h>
    #include <stdio.h>
    #include <windows.h>
    #define NUM_THREADS 48
    
    CRITICAL_SECTION cs; 
    HRESULT hr;
    DWORD dwRsCookie;
    
    /*
       This function is used to make call in the future. for testing Iam 
       giving some delay here
    */
    int ProcessFunc(){
    	int a,b,j,k;
    	for(a=0;a<2000;a++){
    		j=j++;
    		k=k++;
    	}
    	for(b=0;b<20000000;b++){
    		j=j--;
    		k=k--;
    	}
    
    	return 0;
    }
    
    /*
        ThreadFunc is responsible for accessing the Recordset. and moving through
    	recordset.
    */
    DWORD WINAPI ThreadFunc(LPVOID lpParam){
    	try
    	{
    		int i = 0;
    		printf("Thread started \n");
    		CoInitialize(NULL);
    		_RecordsetPtr pRs;
    
    		IGlobalInterfaceTable *lpGIT;
    		CoCreateInstance(CLSID_StdGlobalInterfaceTable, NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable, (void **)&lpGIT); 
    		lpGIT->GetInterfaceFromGlobal(dwRsCookie,__uuidof(_Recordset),(void**)&pRs);
    
    		while(TRUE){
    			EnterCriticalSection(&cs);
    			if (pRs->ADOEOF)
    			{
    				LeaveCriticalSection(&cs);
    				break;
    			}
    			_bstr_t val = pRs->Fields->GetItem(_variant_t("Npa_num"))->Value;
    			//_bstr_t val = pRs->Fields->Item[_variant_t("BTN")]->Value;
    			pRs->MoveNext();
    			LeaveCriticalSection(&cs);
    
    			ProcessFunc();
    			printf("%d %s \n",i++ ,(char*)val);
    		}		
    		lpGIT->Release();
    		
    	}catch(_com_error &e){
    		printf("\tCode = %08lx\n", e.Error());
    		printf("\tCode meaning = %s\n", e.ErrorMessage());
    	}
    	CoUninitialize();
    	return 0;
    }
    
    int main(){
    	DWORD dwThreadId;
        HANDLE hThread[NUM_THREADS];
        int n;    
    	
    	CoInitialize(NULL);
    	_ConnectionPtr pConn;
    	_RecordsetPtr pRs;
    
    	try{
    		hr = pConn.CreateInstance(__uuidof(Connection));
    		hr = pRs.CreateInstance(__uuidof(Recordset));
            _bstr_t strConn("Provider=sqloledb;server=ALBHM01WSINF005;Trusted_C  onnection=yes;database=Core;");
            pConn->Open(strConn,"","",adConnectUnspecified);
    		pRs->Open("SELECT top 25 Npa_Num from Npa (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
            //pRs->Open("SELECT BTN from mbs_GetSpitFireEasyPay (NOLOCK)", pConn.GetInterfacePtr(), adOpenForwardOnly, adLockReadOnly, adCmdText);
    
    	}catch(_com_error &e){
    		printf("Error\n");
    		printf("\tCode meaning = %s", e.ErrorMessage());
    	}
    
    	IGlobalInterfaceTable *lpGIT;
    	CoCreateInstance(
    		CLSID_StdGlobalInterfaceTable, 
    		NULL,CLSCTX_INPROC_SERVER,
    		IID_IGlobalInterfaceTable,
    		(void **)&lpGIT); 
    	lpGIT->RegisterInterfaceInGlobal(pRs,__uuidof(_Recordset),&dwRsCookie);
    	InitializeCriticalSection(&cs);
    
    	// create all the threads 
    	for (n = 0; n < NUM_THREADS; n++){
    		hThread[n] = CreateThread(NULL, 0, ThreadFunc, &pRs, 0, &dwThreadId);  
    		if (hThread[n] == NULL)
    			fprintf(stderr, "Failed to create thread# %d", n);
    	}//for
    
    	for (n = 0; n < NUM_THREADS; n++){
    		if (hThread[n] != NULL){
    			while (MsgWaitForMultipleObjects(1, &hThread[n], FALSE,INFINITE, QS_ALLEVENTS) == WAIT_OBJECT_0 + 1) {
    				MSG msg;
    				while (PeekMessage(&msg, NULL, 0, 0, PM_REMOVE)){
    					TranslateMessage(&msg);
    					DispatchMessage(&msg);
    				}
    			}
    		}
    	}
    	lpGIT->Release();
    	DeleteCriticalSection(&cs);
    	printf("This is after all threads exited\n"); 
    	CoUninitialize();
    	return 0;
    }
    Last edited by anonytmouse; 05-14-2004 at 10:16 PM.

  5. #20
    Registered User
    Join Date
    Apr 2004
    Posts
    10
    GREAT!!!!. The code now works fine. Thanks for all your help. I appreciate it

Popular pages Recent additions subscribe to a feed