Code Example - Parallel Transporter

Teradata Parallel Transporter Application Programming Interface Programmer Guide

Product
Parallel Transporter
Release Number
16.10
Published
May 2017
Language
English (United States)
Last Update
2018-05-15
dita:mapPath
pev1488824663354.ditamap
dita:ditavalPath
Audience_PDF_include.ditaval
dita:id
B035-2516
lifecycle
previous
Product Category
Teradata Tools and Utilities

The following is a simple example of how to utilize Teradata PT in a parallel environment. The code is not intended for actual use but rather to highlight the keys to the Teradata PT parallel process. The architecture of the example could be implemented in either a multi-threaded or multi-process environment.

This is a multi-instance example using the Load driver. The structure of the example involves two components: a master component and a slave component.

The master component has two responsibilities: handling its designated portion of the workload (loading a subset of the total rows) and maintaining the synchronization between all of the instances.

The following is the main execution method for the master instance component:

#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"

using teradata::client::API;

int returnValue = 0;
Connection* mConnection = new Connection();
/**********************************************
* Add Connection Parameters
**********************************************/

mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD);

/* Add Attributes */
// Master Instance Specific Attributes
//	TD_INSTANCE_NUM: Master is always “1”
//	TD_MAX_INSTANCES: Total number of instances including master
//	TD_MAX_SESSIONS: Number of sessions are divided between master
//			     and slaves
//	TD_MIN_SESSIONS

/* Add Schema */

/* Add DMLGroups */

// Note: Except for the special attributes listed above, both master
// and slave instances should have exactly the same Connection
// Parameters.
/**********************************************
* Initiate
**********************************************/

while ( returnValue != TD_END_Method ){

	returnValue = mConnection->Initiate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all slaves to signal barrier
			// 2) Tell slaves to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all slaves to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to slaves
			// 4) Tell slaves to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell slaves to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
			return TD_ERROR;
	}
}

//Wait for slaves to finish, then continue	
/**********************************************
* Acquisition
**********************************************/

char rowBuffer[256];
unsigned short rowLength = 0;
bool doneExporting = false;
rcgetrow    = 0;
returnValue = 0;

while (!doneExporting){

	//Get Row For Load
	rcGetrow = getRow( rowBuffer );

	if (rcGetrow != TD_END_Method){
		//Load Row
		rowLength = *((unsigned short *)rowBuffer);
		returnValue = PutRow( rowBuffer+2, rowLength );

	switch ( returnValue ){

		case TD_Success:
		//Continue to Next Row
		break;

		default:
			//When errors occur:
			// 1) Tell slaves to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
			return TD_ERROR;
	}

	}else {
		/* End Acquisition Phase */
      doneExporting = true;
		returnValue = mConnection->EndAcquisition();

		switch ( returnValue ){

			case TD_SYNC_Barrier:
			// 1) Wait for all slaves to signal barrier
			// 2) Tell slaves to proceed
			// 3) Continue
			break;

			case TD_SYNC_TELINFO:
			// 1) Wait for all slaves to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to slaves
			// 4) Tell slaves to proceed
			// 5) Continue
			break;

			case TD_END_Method:
			//Acquisition Complete
			break;

			default:
			//When errors occur:
			// 1) Tell slaves to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
		}
}
}

//Wait for slaves to finish, then continue
/**********************************************
* Application
**********************************************/
returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->ApplyRows();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all slaves to signal barrier
			// 2) Tell slaves to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all slaves to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to slaves
			// 4) Tell slaves to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell slaves to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
//Wait for slaves to finish, then continue
/**********************************************
* Terminate
**********************************************/

returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->Terminate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Wait for all slaves to signal barrier
			// 2) Tell slaves to proceed
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Wait for all slaves to signal barrier
			// 2) Use GetTELINFO to retrieve TELINFO area
			// 3) Pass copy of TELINFO area to slaves
			// 4) Tell slaves to proceed
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Tell slaves to call Terminate – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
/**********************************************
* Clean Up
**********************************************/
//delete Schema objects
//delete DMLGroup objects
delete mConnection;
return returnValue;

The slave instance has two responsibilities: handling its designated workload (loading a subset of the total rows) and reporting synchronization codes to the master and following the master’s instructions.

The following is the main execution method of the slave component:

#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"

using teradata::client::API;

int returnValue = 0;
Connection* mConnection = new Connection();
/**********************************************
* Add Connection Parameters
**********************************************/

mConnection->AddAttribute(TD_SYSTEM_OPERATOR,TD_LOAD);

/* Add Attributes */

// Slave Instance Specific Attributes
//	TD_INSTANCE_NUM: Slave number must be greater than 1 and
//				no slave can have same instance number as another instance.
//	TD_MAX_INSTANCES: Total number of instances including master
//				This must be set correctly as the slave will
//				use this when accessing TELINFO area.

/* Add Schema */

/* Add DMLGroups */

// Note: Except for the special attributes listed above, both master
// and slave instances should have exactly the same Connection
// Parameters.
/**********************************************
* Initiate
**********************************************/

while ( returnValue != TD_END_Method ){

	returnValue = mConnection->Initiate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to master
			// 2) Wait for master signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to master
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the master.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to master – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;

	}
}

//Wait for master to finish, then continue	
/**********************************************
* Acquisition
**********************************************/

char rowBuffer[256];
unsigned short rowLength = 0;
bool doneExporting = false;
rcgetrow    = 0;
returnValue = 0;

while (!doneExporting){

	//Get Row For Load
	rcGetrow = getRow( rowBuffer );

	if (rcGetrow != TD_END_Method){
		//Load Row
		rowLength = *((unsigned short *)rowBuffer);
		returnValue = PutRow( rowBuffer+2, rowLength );

	switch ( returnValue ){

		case TD_Success:
		//Continue to Next Row
		break;

		default:
			//When errors occur:
			// 1) Signal error to master – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
	}else {
		/* End Acquisition Phase */
      doneExporting = true;
		returnValue = mConnection->EndAcquisition();

		switch ( returnValue ){

			case TD_SYNC_Barrier:
			// 1) Signal barrier to master
			// 2) Wait for master signal
			// 3) Continue
               break;

			case TD_SYNC_TELINFO:
			// 1) Signal barrier to master
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the master.
			// 5) Continue
			break;

			case TD_END_Method:
			//Acquisition Complete
			break;

			default:
			//When errors occur:
			// 1) Signal error to master – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart
		return TD_ERROR;
		}
}
}

//Wait for master to finish, then continue
/**********************************************
* Application
**********************************************/
returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->ApplyRows();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to master
			// 2) Wait for master signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to master
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the master.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to master – all instances
			//	must Terminate synchronously. See Terminate
			//	setup below.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
//Wait for master to finish, then continue
/**********************************************
* Terminate
**********************************************/

returnValue =  0;

while ( returnValue != TD_END_Method ) {

	returnValue = mConnection->Terminate();

	switch ( returnValue ){

		case TD_SYNC_Barrier:
			// 1) Signal barrier to master
			// 2) Wait for master signal
			// 3) Continue
			break;
		case TD_SYNC_TELINFO:
			// 1) Signal barrier to master
			// 2) Use PutTELINFO to set the TELINFO
			//	area passed by the master.
			// 5) Continue
			break;
		case TD_END_Method:
			//Method is Complete
			break;
		default:
			//When errors occur:
			// 1) Signal error to master – all instances
			//	must Terminate synchronously.
			// 2) After Terminate, get error info
			// 3) Quit or restart

			return TD_ERROR;
	}
}
/**********************************************
* Clean Up
**********************************************/
//delete Schema objects
//delete DMLGroup objects
delete mConnection;
return returnValue;