The following is a simple example of how to utilize Teradata PT in a parallel environment. The code is not intended for actual use but rather to highlight the keys to the Teradata PT parallel process. The architecture of the example could be implemented in either a multi-threaded or multi-process environment.
This is a multi-instance example using the Load driver. The structure of the example involves two instances: a main instance and a worker instance.
The main instance has two responsibilities: handling its designated portion of the workload (loading a subset of the total rows) and maintaining the synchronization between all of the instances.
The following is the execution method for the main instance:
#include "connection.h" #include "schema.h" #include "DMLGroup.h" using teradata::client::API; int returnValue = 0; Connection* mConnection = new Connection(); /********************************************** * Add Connection Parameters **********************************************/ mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD); /* Add Attributes */ // Main Instance Specific Attributes // TD_INSTANCE_NUM: Main is always “1” // TD_MAX_INSTANCES: Total number of instances including main // TD_MAX_SESSIONS: Number of sessions are divided between main // and workers // TD_MIN_SESSIONS /* Add Schema */ /* Add DMLGroups */ // Note: Except for the special attributes listed above, both main // and worker instances should have exactly the same Connection // Parameters. /********************************************** * Initiate **********************************************/ while ( returnValue != TD_END_Method ){ returnValue = mConnection->Initiate(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Wait for all workers to signal barrier // 2) Tell workers to proceed // 3) Continue break; case TD_SYNC_TELINFO: // 1) Wait for all workers to signal barrier // 2) Use GetTELINFO to retrieve TELINFO area // 3) Pass copy of TELINFO area to workers // 4) Tell workers to proceed // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Tell workers to call Terminate – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } //Wait for workers to finish, then continue /********************************************** * Acquisition **********************************************/ char rowBuffer[256]; unsigned short rowLength = 0; bool doneExporting = false; rcgetrow = 0; returnValue = 0; while (!doneExporting){ //Get Row For Load rcGetrow = getRow( rowBuffer ); if (rcGetrow != TD_END_Method){ //Load Row rowLength = *((unsigned short *)rowBuffer); returnValue = PutRow( rowBuffer+2, rowLength ); switch ( returnValue ){ case TD_Success: //Continue to Next Row break; default: //When errors occur: // 1) Tell workers to call Terminate – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } }else { /* End Acquisition Phase */ doneExporting = true; returnValue = mConnection->EndAcquisition(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Wait for all workers to signal barrier // 2) Tell workers to proceed // 3) Continue break; case TD_SYNC_TELINFO: // 1) Wait for all workers to signal barrier // 2) Use GetTELINFO to retrieve TELINFO area // 3) Pass copy of TELINFO area to workers // 4) Tell workers to proceed // 5) Continue break; case TD_END_Method: //Acquisition Complete break; default: //When errors occur: // 1) Tell workers to call Terminate – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } } //Wait for workers to finish, then continue /********************************************** * Application **********************************************/ returnValue = 0; while ( returnValue != TD_END_Method ) { returnValue = mConnection->ApplyRows(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Wait for all workers to signal barrier // 2) Tell workers to proceed // 3) Continue break; case TD_SYNC_TELINFO: // 1) Wait for all workers to signal barrier // 2) Use GetTELINFO to retrieve TELINFO area // 3) Pass copy of TELINFO area to workers // 4) Tell workers to proceed // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Tell workers to call Terminate – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } //Wait for workers to finish, then continue /********************************************** * Terminate **********************************************/ returnValue = 0; while ( returnValue != TD_END_Method ) { returnValue = mConnection->Terminate(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Wait for all workers to signal barrier // 2) Tell workers to proceed // 3) Continue break; case TD_SYNC_TELINFO: // 1) Wait for all workers to signal barrier // 2) Use GetTELINFO to retrieve TELINFO area // 3) Pass copy of TELINFO area to workers // 4) Tell workers to proceed // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Tell workers to call Terminate – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } /********************************************** * Clean Up **********************************************/ //delete Schema objects //delete DMLGroup objects delete mConnection; return returnValue;
The worker instance has two responsibilities: handling its designated workload (loading a subset of the total rows) and reporting synchronization codes to the main and following the main's instructions.
The following is the execution method of the worker instance:
#include "connection.h" #include "schema.h" #include "DMLGroup.h" using teradata::client::API; int returnValue = 0; Connection* mConnection = new Connection(); /********************************************** * Add Connection Parameters **********************************************/ mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD); /* Add Attributes */ // Worker Instance Specific Attributes // TD_INSTANCE_NUM: Worker number must be greater than 1 and // no worker can have same instance number as another instance. // TD_MAX_INSTANCES: Total number of instances including main // This must be set correctly as the worker will // use this when accessing TELINFO area. /* Add Schema */ /* Add DMLGroups */ // Note: Except for the special attributes listed above, both main // and worker instances should have exactly the same Connection // Parameters. /********************************************** * Initiate **********************************************/ while ( returnValue != TD_END_Method ){ returnValue = mConnection->Initiate(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Signal barrier to main // 2) Wait for main signal // 3) Continue break; case TD_SYNC_TELINFO: // 1) Signal barrier to main // 2) Use PutTELINFO to set the TELINFO // area passed by the main. // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Signal error to main – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } //Wait for main to finish, then continue /********************************************** * Acquisition **********************************************/ char rowBuffer[256]; unsigned short rowLength = 0; bool doneExporting = false; rcgetrow = 0; returnValue = 0; while (!doneExporting){ //Get Row For Load rcGetrow = getRow( rowBuffer ); if (rcGetrow != TD_END_Method){ //Load Row rowLength = *((unsigned short *)rowBuffer); returnValue = PutRow( rowBuffer+2, rowLength ); switch ( returnValue ){ case TD_Success: //Continue to Next Row break; default: //When errors occur: // 1) Signal error to main – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } }else { /* End Acquisition Phase */ doneExporting = true; returnValue = mConnection->EndAcquisition(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Signal barrier to main // 2) Wait for main signal // 3) Continue break; case TD_SYNC_TELINFO: // 1) Signal barrier to main // 2) Use PutTELINFO to set the TELINFO // area passed by the main. // 5) Continue break; case TD_END_Method: //Acquisition Complete break; default: //When errors occur: // 1) Signal error to main – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } } //Wait for main to finish, then continue /********************************************** * Application **********************************************/ returnValue = 0; while ( returnValue != TD_END_Method ) { returnValue = mConnection->ApplyRows(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Signal barrier to main // 2) Wait for main signal // 3) Continue break; case TD_SYNC_TELINFO: // 1) Signal barrier to main // 2) Use PutTELINFO to set the TELINFO // area passed by the main. // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Signal error to main – all instances // must Terminate synchronously. See Terminate // setup below. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } //Wait for main to finish, then continue /********************************************** * Terminate **********************************************/ returnValue = 0; while ( returnValue != TD_END_Method ) { returnValue = mConnection->Terminate(); switch ( returnValue ){ case TD_SYNC_Barrier: // 1) Signal barrier to main // 2) Wait for main signal // 3) Continue break; case TD_SYNC_TELINFO: // 1) Signal barrier to main // 2) Use PutTELINFO to set the TELINFO // area passed by the main. // 5) Continue break; case TD_END_Method: //Method is Complete break; default: //When errors occur: // 1) Signal error to main – all instances // must Terminate synchronously. // 2) After Terminate, get error info // 3) Quit or restart return TD_ERROR; } } /********************************************** * Clean Up **********************************************/ //delete Schema objects //delete DMLGroup objects delete mConnection; return returnValue;