Code Example - Parallel Transporter

Teradata® Parallel Transporter Application Programming Interface Programmer Guide - 17.20

Product
Parallel Transporter
Release Number
17.20
Published
June 2022
Language
English (United States)
Last Update
2022-10-11
dita:mapPath
fag1645201363032.ditamap
dita:ditavalPath
obe1474387269547.ditaval
dita:id
B035-2516
Product Category
Teradata Tools and Utilities

The following is a simple example of how to utilize Teradata PT in a parallel environment. The code is not intended for actual use but rather to highlight the keys to the Teradata PT parallel process. The architecture of the example could be implemented in either a multi-threaded or multi-process environment.

This is a multi-instance example using the Load driver. The structure of the example involves two instances: a main instance and a worker instance.

The main instance has two responsibilities: handling its designated portion of the workload (loading a subset of the total rows) and maintaining the synchronization between all of the instances.

The following is the execution method for the main instance:

#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"

using teradata::client::API;

int returnValue = 0;
Connection* mConnection = new Connection();
/**********************************************
* Add Connection Parameters
**********************************************/

mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD);

/* Add Attributes */
// Main Instance Specific Attributes
//	TD_INSTANCE_NUM: Main is always “1”
//	TD_MAX_INSTANCES: Total number of instances including main
//	TD_MAX_SESSIONS: Number of sessions are divided between main
//			     and workers
//	TD_MIN_SESSIONS

/* Add Schema */

/* Add DMLGroups */

// Note: Except for the special attributes listed above, both main
// and worker instances should have exactly the same Connection
// Parameters.
/**********************************************
* Initiate
**********************************************/

while ( returnValue != TD_END_Method ){

	returnValue = mConnection->Initiate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all workers to signal barrier
			// 2) Tell workers to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all workers to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to workers
			// 4) Tell workers to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell workers to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
			return TD_ERROR;
	}
}

//Wait for workers to finish, then continue	
/**********************************************
* Acquisition
**********************************************/

char rowBuffer[256];
unsigned short rowLength = 0;
bool doneExporting = false;
rcgetrow    = 0;
returnValue = 0;

while (!doneExporting){

	//Get Row For Load
	rcGetrow = getRow( rowBuffer );

	if (rcGetrow != TD_END_Method){
		//Load Row
		rowLength = *((unsigned short *)rowBuffer);
		returnValue = PutRow( rowBuffer+2, rowLength );

	switch ( returnValue ){

		case TD_Success:
		//Continue to Next Row
		break;

		default:
			//When errors occur:
			// 1) Tell workers to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
			return TD_ERROR;
	}

	}else {
		/* End Acquisition Phase */
      doneExporting = true;
		returnValue = mConnection->EndAcquisition();

		switch ( returnValue ){

			case TD_SYNC_Barrier:
			// 1) Wait for all workers to signal barrier
			// 2) Tell workers to proceed
			// 3) Continue
			break;

			case TD_SYNC_TELINFO:
			// 1) Wait for all workers to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to workers
			// 4) Tell workers to proceed
			// 5) Continue
			break;

			case TD_END_Method:
			//Acquisition Complete
			break;

			default:
			//When errors occur:
			// 1) Tell workers to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
		}
}
}

//Wait for workers to finish, then continue
/**********************************************
* Application
**********************************************/
returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->ApplyRows();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all workers to signal barrier
			// 2) Tell workers to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all workers to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to workers
			// 4) Tell workers to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell workers to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
//Wait for workers to finish, then continue
/**********************************************
* Terminate
**********************************************/

returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->Terminate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all workers to signal barrier
			// 2) Tell workers to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all workers to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to workers
			// 4) Tell workers to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell workers to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
/**********************************************
* Clean Up
**********************************************/
//delete Schema objects
//delete DMLGroup objects
delete mConnection;
return returnValue;

The worker instance has two responsibilities: handling its designated workload (loading a subset of the total rows) and reporting synchronization codes to the main and following the main's instructions.

The following is the execution method of the worker instance:

#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"

using teradata::client::API;

int returnValue = 0;
Connection* mConnection = new Connection();
/**********************************************
* Add Connection Parameters
**********************************************/

mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD);

/* Add Attributes */

// Worker Instance Specific Attributes
//	TD_INSTANCE_NUM: Worker number must be greater than 1 and
//				no worker can have same instance number as another instance.
//	TD_MAX_INSTANCES: Total number of instances including main
//				This must be set correctly as the worker will
//				use this when accessing TELINFO area.

/* Add Schema */

/* Add DMLGroups */

// Note: Except for the special attributes listed above, both main
// and worker instances should have exactly the same Connection
// Parameters.
/**********************************************
* Initiate
**********************************************/

while ( returnValue != TD_END_Method ){

	returnValue = mConnection->Initiate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to main
			// 2) Wait for main signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to main
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the main.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to main – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;

	}
}

//Wait for main to finish, then continue	
/**********************************************
* Acquisition
**********************************************/

char rowBuffer[256];
unsigned short rowLength = 0;
bool doneExporting = false;
rcgetrow    = 0;
returnValue = 0;

while (!doneExporting){

	//Get Row For Load
	rcGetrow = getRow( rowBuffer );

	if (rcGetrow != TD_END_Method){
		//Load Row
		rowLength = *((unsigned short *)rowBuffer);
		returnValue = PutRow( rowBuffer+2, rowLength );

	switch ( returnValue ){

		case TD_Success:
		//Continue to Next Row
		break;

		default:
			//When errors occur:
			// 1) Signal error to main – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
	}else {
		/* End Acquisition Phase */
      doneExporting = true;
		returnValue = mConnection->EndAcquisition();

		switch ( returnValue ){

			case TD_SYNC_Barrier:
			// 1) Signal barrier to main
			// 2) Wait for main signal
			// 3) Continue
               break;

			case TD_SYNC_TELINFO:
			// 1) Signal barrier to main
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the main.
			// 5) Continue
			break;

			case TD_END_Method:
			//Acquisition Complete
			break;

			default:
			//When errors occur:
			// 1) Signal error to main – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
		return TD_ERROR;
		}
}
}

//Wait for main to finish, then continue
/**********************************************
* Application
**********************************************/
returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->ApplyRows();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to main
			// 2) Wait for main signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to main
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the main.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to main – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
//Wait for main to finish, then continue
/**********************************************
* Terminate
**********************************************/

returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->Terminate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to main
			// 2) Wait for main signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to main
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the main.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to main – all instances
			//	must Terminate synchronously.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
/**********************************************
* Clean Up
**********************************************/
//delete Schema objects
//delete DMLGroup objects
delete mConnection;
return returnValue;