15.10 - Code Example - Parallel Transporter

Teradata Parallel Transporter Application Programming Interface

prodname
Parallel Transporter
vrm_release
15.10
category
Programming Reference
featnum
B035-2516-035K

Code Example

#include "connection.h"
#include "schema.h"
#include "DMLGroup.h"
using namespace teradata::client::API;
 
int returnValue = 0;
char* errorMsg = NULL;
TD_ErrorType errorType;
 
cout << "*** Stream Driver Example ***" << endl;
 
Connection *conn = new Connection();
/**********************************************
* Set Operator Type and Trace/Log Levels
**********************************************/
conn->AddAttribute(TD_SYSTEM_OPERATOR,TD_STREAM);
//conn->AddAttribute(TD_TRACE_OUTPUT,"stream.txt");
//conn->AddArrayAttribute(TD_TRACE_LEVEL,2,TD_OPER_ALL,TD_OFF,NULL);
/**********************************************
* Add Attributes
**********************************************/
conn->AddAttribute(TD_USER_NAME,"user");
conn->AddAttribute(TD_USER_PASSWORD,"password");
conn->AddAttribute(TD_TDP_ID,"database");
conn->AddAttribute(TD_LOG_TABLE,"tdload_log");
conn->AddAttribute(TD_ERROR_TABLE,"tdload_e1");
conn->AddAttribute(TD_TARGET_TABLE,"tdload");
conn->AddAttribute(TD_MAX_SESSIONS,4);
conn->AddAttribute(TD_MIN_SESSIONS,1);
conn->AddAttribute(TD_TENACITY_HOURS,2);
conn->AddAttribute(TD_TENACITY_SLEEP,3);
conn->AddAttribute(TD_ROBUST,"No");
/**********************************************
* Add Schema
**********************************************/
 
Schema * schema = new Schema("input");
schema->AddColumn("Associate_Id",TD_INTEGER,4);
schema->AddColumn("Associate_Name",TD_CHAR,25);
schema->AddColumn("Salary",TD_FLOAT,8);
schema->AddColumn("DOJ",TD_DATE,4);
schema->AddColumn("Designation",TD_VARCHAR,25);
schema->AddColumn("Loan_Amount",TD_DECIMAL,4,5,2);
schema->AddColumn("Martial_Status",TD_CHAR,1);
schema->AddColumn("No_Of_Dependents",TD_BYTEINT,1);
conn->AddSchema(schema);
/**********************************************
* Add DMLGroups
**********************************************/
TD_Index dmlGroupIndex = 0;
 
DMLGroup * dmlGr = new DMLGroup();
dmlGr->AddStatement("INSERT INTO tdload( :Associate_Id, :Associate_Name, :Salary, :DOJ, :Designation, :Loan_Amount, :Martial_Status, :No_Of_Dependents);");
dmlGr->AddSerializeOn(8, "Associate_Id", "Associate_Name", "Salary", "DOJ", "Designation", "Loan_Amount", "Martial_Status", "No_Of_Dependents", NULL);
dmlGr->AddUseList(8, "Associate_Id", "Associate_Name", "Salary", "DOJ", "Designation", "Loan_Amount", "Martial_Status", "No_Of_Dependents", NULL);
returnValue = conn->AddDMLGroup(dmlGr,&dmlGroupIndex);
/**********************************************
* Initiate
**********************************************/
returnValue = conn->Initiate();
 
cout << "Driver Initiated with status " << returnValue << endl;
 
if ( returnValue < TD_Error )
{/**********************************************
	* Acquisition
	**********************************************/
 
	char rowBuffer[78]; /* must include the EOL byte */
	int loadStatus = 0;
	TD_Length bytes;
	int count = 0;
 
	while( loadStatus != -1 )
	{
		loadStatus = GetRowData(rowBuffer,78);	   // user function - reads in a
													// row of data from a file
													// returns -1 when end of file
													// reached
 
		if( loadStatus != -1 )
		{
			/* pick up the first 2 bytes as the row length of the indicator mode
				record */
			bytes = *((unsigned short *)rowBuffer);
			returnValue = conn->PutRow( rowBuffer + 2 , bytes);
 
			if ( returnValue >=TD_Error )
			{
				cout << "PutRow failed on row " << count+1;
				cout << " with status " << returnValue << endl;
				loadStatus = GetRowData(NULL,0); //user function - close file
			}else{
				count++;
			}
 
		}
 
	}
 
	if ( returnValue < TD_Error )
	{
		cout << "Sent " << count << " rows" << endl;
		/**********************************************
		* End Acquisition
		**********************************************/
 
		returnValue = conn->EndAcquisition();
 
		cout << "Acquisition completed with status " << returnValue << endl;
 
 
		}else{
			//Get Error Information
			cout << "Error occured during EndAcquisition" << endl;
			conn->GetErrorInfo(&errorMsg,&errorType);
			if ( errorMsg != NULL ){
				cout << errorMsg << endl;
				cout << "Type: " << errorType << endl;
			}else{
				cout << "No Error Info Available" << endl;
			}
		}
 
	}else{
		//Get Error Information
		cout << "Error occured during Acquisition" << endl;
		conn->GetErrorInfo(&errorMsg,&errorType);
		if ( errorMsg != NULL ){
			cout << errorMsg << endl;
			cout << "Type: " << errorType << endl;
		}else{
			cout << "No Error Info Available" << endl;
		}
	}
 
}else{
	//Get Error Information
	cout << "Error occured during Initiate" << endl;
	conn->GetErrorInfo(&errorMsg,&errorType);
	if ( errorMsg != NULL ){
		cout << errorMsg << endl;
		cout << "Type: " << errorType << endl;
	}else{
		cout << "No Error Info Available" << endl;
	}
}
/**********************************************
* Terminate
**********************************************/
 
returnValue = conn->Terminate();
 
cout << "Driver Terminated with status " << returnValue << endl;
 
if ( returnValue >= TD_Error )
{
	//Get Error Information
	cout << "Error occured during Terminate" << endl;
	conn->GetErrorInfo(&errorMsg,&errorType);
	if ( errorMsg != NULL ){
		cout << errorMsg << endl;
		cout << "Type: " << errorType << endl;
	}else{
		cout << "No Error Info Available" << endl;
	}
}
/**********************************************
* Clean Up
**********************************************/
 
cout << "Deleting Objects" << endl;
delete dmlGr;
delete schema;
delete conn;
 
cout << "*** Stream Complete ***" << endl;