*********************************************************************** * Copyright 2006-2009, 2011 by Teradata Corporation. * All Rights Reserved. * TERADATA CONFIDENTIAL AND TRADE SECRET * * This copyrighted material is the Confidential, Unpublished * Property of the Teradata Corporation. This copyright notice and * any other copyright notices included in machine readable * copies must be reproduced on all authorized copies. * ***********************************************************************/ #include "GenericDriver.h" #include "OptionsManager.h" #include <stdio.h> #include <stdlib.h> #include <time.h> #include "string.h" #include <vector> #define MAX 10 using namespace teradata::client::API; GenericDriver::GenericDriver(OptionsManager * om) { inputStream = NULL; outputStream = NULL; inputFileName = NULL; outputFileName = NULL; m_om = om; } GenericDriver::~GenericDriver(void) { if( inputFileName != NULL ) { free(inputFileName); } if( outputFileName != NULL ) { free(outputFileName); } } /***************************************************/ /* testLoad */ /* */ /* Tests transferring data from Export's GetBuffer */ /* to Load's PutBuffer. Uses the following steps: */ /* */ /* 1. Setup and Initiate Load driver */ /* 2. Query buffer layout from Load */ /* 3. Pass Load buffer layout to Export driver */ /* 4. Initiate Export driver */ /* 5. Transfer buffers from GetBuffer to PutBuffer */ /* 6. Call Load's EndAcquisition & ApplyRows */ /* 7. Terminate Load and Export */ /***************************************************/ void GenericDriver::testExportToLoad() { time_t now; time(&now); int ldStatus = 0; int expStatus = 0; int queryStatus = 0; char *errorMsg = NULL; TD_ErrorType errorType; RowCounts ldCount; /* RowCounts struct defined in GetBuffer.h */ char *dataptr; TD_Length datalen; char *msg = (char*)malloc(256); /********************************************** * Retrieve Options From Option File **********************************************/ int numSessions = m_om -> getNumSessions(); char *tdp_id = strdup(m_om -> getTdpId().c_str()); char *user_name = strdup(m_om -> getUserName().c_str()); char *user_password = strdup(m_om -> getUserPassword().c_str()); char *target_table = strdup(m_om -> getTargetTable().c_str()); char *target_table_export = strdup(m_om -> getTargetTableExport().c_str()); char *log_table = strdup(m_om -> getLogTable().c_str()); char *error_table_1 = strdup(m_om -> getErrorTable1().c_str()); char *error_table_2 = strdup(m_om -> getErrorTable2().c_str()); char *ins_statement = (char*)malloc(1024); char *sel_statement = (char*)malloc(1024); sprintf(ins_statement, "INSERT INTO %s;",target_table); //sprintf(ins_statement, "INSERT INTO %s( :col1, :col2);",target_table); sprintf(sel_statement, "SELECT * FROM %s;", target_table_export); cout << "######################### OUTPUT START #########################" \ << endl; cout << "\nTest Description :GenericDriver sample" << endl; cout << "Date :" << ctime(&now); Connection *ldConn = new Connection(); /* Load */ Connection *expConn = new Connection(); /* Export */ /********************************************** * Set Operator Type and Trace/Log Levels **********************************************/ ldConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_LOAD); expConn -> AddAttribute(TD_SYSTEM_OPERATOR, TD_EXPORT); expConn -> AddAttribute(TD_TRACE_OUTPUT, "exportL.txt"); expConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\ TD_GENERAL, NULL); ldConn -> AddAttribute(TD_TRACE_OUTPUT, "load.txt"); ldConn -> AddArrayAttribute(TD_TRACE_LEVEL, 2, TD_OPER_ALL,\ TD_GENERAL, NULL); /********************************************** * Add Attributes **********************************************/ ldConn -> AddAttribute(TD_TDP_ID, tdp_id); ldConn -> AddAttribute(TD_USER_NAME, user_name); ldConn -> AddAttribute(TD_USER_PASSWORD, user_password); ldConn -> AddAttribute(TD_TARGET_TABLE, target_table); ldConn -> AddAttribute(TD_LOG_TABLE, log_table); ldConn -> AddAttribute(TD_ERROR_TABLE_1, error_table_1); ldConn -> AddAttribute(TD_ERROR_TABLE_2, error_table_2); ldConn -> AddAttribute(TD_MAX_SESSIONS, numSessions); ldConn -> AddAttribute(TD_MIN_SESSIONS, numSessions); ldConn -> AddAttribute(TD_WILDCARDINSERT, "Yes"); //ldConn -> AddAttribute(TD_CHARSET, "UTF8"); //ldConn -> AddAttribute(TD_CHARSET, "UTF16"); expConn -> AddAttribute(TD_TDP_ID, tdp_id); expConn -> AddAttribute(TD_USER_NAME, user_name); expConn -> AddAttribute(TD_USER_PASSWORD, user_password); expConn -> AddAttribute(TD_SELECT_STMT, sel_statement); expConn -> AddAttribute(TD_MAX_SESSIONS, numSessions); expConn -> AddAttribute(TD_MIN_SESSIONS, numSessions); expConn -> AddAttribute(TD_RETN_ACT_DATATYPE, "FOO"); //expConn -> AddAttribute(TD_CHARSET, "UTF8"); //expConn -> AddAttribute(TD_CHARSET, "UTF16"); /********************************************** * Add Schema **********************************************/ Schema* schema = NULL; cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " EXPORT OPERATOR CONNECTION INFO " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << "Operator type :EXPORT" << endl; cout << "TDP ID :" << tdp_id << endl; cout << "TDP username :" << user_name << endl; cout << "TDP password :<not displayable>" << endl; cout << "TargetTable :" << target_table_export << endl; cout << "DML Statement :" << sel_statement << endl; /********************************************** * Initiate Export Driver **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " INITIATE PHASE " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; expStatus = expConn -> Initiate(); if ( expStatus >= TD_Error ) { //Get Error Information cout << "Error occured during Export Initiate" << endl; expConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } else { cout << "Export Driver initiated Successfully" << endl; /********************************************** * Add Schema for load **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " COLUMN INFO " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; Schema* dynamicSchema; int returnValue = expConn -> GetSchema(&dynamicSchema); ldConn -> AddSchema(dynamicSchema); if ( returnValue < TD_Error && returnValue != TD_Unavailable ) { int Count = dynamicSchema -> GetColumnCount(); vector<Column*> column(MAX); cout << "Column Count:" << Count << endl; for (vector<Column*>::size_type i=0; i < Count; i++) { column[i] = dynamicSchema -> GetColumn(i); char * Name = column[i] -> GetName(); TD_DataType dataType = (TD_DataType)column[i] -> GetDataType(); int precision = column[i] -> GetPrecision(); int scale = column[i] -> GetScale(); cout << endl; cout << "Column:" << i+1 << " Info" <<endl; cout << "Name:" << Name << endl; cout << "DataType:" << dataType << endl; cout << "Precision:" << precision << endl; cout << "scale:" << scale << endl; } } else { printf(" GetSchema failed with status != %d", returnValue); } /********************************************** * Query Version Number **********************************************/ queryStatus = expConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen); cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " VERSION INFO " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; if ( queryStatus < TD_Error && queryStatus != TD_Unavailable ) { cout << "TPTAPI Version :"; cout << *((char**)dataptr) << endl; cout << "Operator Version:"; cout << *((char**)(dataptr+sizeof(char**))) << endl; } else { printStatus("TD_Evt_Version event failed with status ", queryStatus); } /********************************************** * Add DMLGroups **********************************************/ TD_Index dmlGroupIndex = 0; DMLGroup *dmlGr = new DMLGroup(); dmlGr -> AddStatement(ins_statement); ldConn -> AddDMLGroup(dmlGr, &dmlGroupIndex); cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " LOAD OPERATOR CONNECTION INFO " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << "Operator type :LOAD" << endl; cout << "TDP ID :" << tdp_id << endl; cout << "TDP username :" << user_name << endl; cout << "TDP password :<not displayable>" << endl; cout << "Target Table :" << target_table << endl; cout << "DML Statement :" << ins_statement << endl; /********************************************** * Initiate Load Driver **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " INITIATE PHASE " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; ldStatus = ldConn -> Initiate(); if ( ldStatus < TD_Error ) { cout << "Load Driver initiated Successfully" << endl; /********************************************** * Query Version Number **********************************************/ queryStatus = ldConn -> GetEvent(TD_Evt_Version, &dataptr, &datalen); cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " VERSION INFO " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; if ( queryStatus < TD_Error && queryStatus != TD_Unavailable ) { cout << "TPTAPI Version :"; cout << *((char**)dataptr) << endl; cout << "Operator Version:"; cout << *((char**)(dataptr+sizeof(char**))) << endl; } else { printStatus("TD_Evt_Version event failed with status ", queryStatus) } /********************************************** * Acquisition **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " ACQUISITION PHASE " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; char* dataBuffer; TD_Length dataLen = 0; int count = 0; int transferDone = 0; while(!transferDone) { /* Get a buffer from Export */ expStatus = expConn -> GetRow(&dataBuffer, &dataLen); if ( expStatus >= TD_Error ) { sprintf(msg,"GetBuffer failed on buffer %d with status ", count+1); printStatus(msg, expStatus); transferDone = 1; } else if ( expStatus == TD_END_Method ) { cout << "End of Data Reached" << endl; transferDone = 1; } else { /* Pass Buffer to Load */ ldStatus = ldConn -> PutRow(dataBuffer, dataLen); if ( ldStatus != TD_Success ) { sprintf(msg,"PutBuffer failed on buffer %d with status ", count+1); printStatus(msg, ldStatus); transferDone = 1; } else { count++; /* Get Load Row Counts */ } } } if ( ldStatus < TD_Error && expStatus < TD_Error ) { /********************************************** * End Acquisition **********************************************/ ldStatus = ldConn -> EndAcquisition(); if ( ldStatus < TD_Error ) { cout << "Acquisition completed Successfully" << endl; /* Get Load Row Counts */ queryStatus = ldConn -> GetEvent(TD_Evt_RowCounts, &dataptr, \ &datalen); if ( queryStatus < TD_Error && queryStatus != TD_Unavailable ) { ldCount = *((RowCounts*)dataptr); cout << "Rows Received :"; cout << ldCount.RowsReceived << endl; cout << "Rows Sent :"; cout << ldCount.RowsSent << endl; cout << "Rows Applied :"; cout << ldCount.RowsApplied << endl; } else { cout << "TD_Evt_RowCounts Info Not Available" << \ queryStatus << endl; } /********************************************** * Application **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " APPLICATION PHASE " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; ldStatus = ldConn -> ApplyRows(); if ( ldStatus < TD_Error ) { cout << "Rows Applied Successfully" << endl; cout << "Transfer completed successfully" << endl; cout << "Transferred " << count << " buffers" << endl; /* Get Export Row Count */ queryStatus = expConn -> GetEvent(TD_Evt_ExportCount,\ &dataptr, &datalen); if ( queryStatus < TD_Error && queryStatus != TD_Unavailable ) { cout << "TD_Evt_ExportCount:"; cout << *((int*)dataptr) << endl; } else { printStatus("TD_Evt_ExportCount event failed with status", \ queryStatus); } /* Get Load Apply Count */ queryStatus = ldConn -> GetEvent(TD_Evt_ApplyCount, &dataptr, \ &datalen); if ( queryStatus < TD_Error && queryStatus != TD_Unavailable ) { cout << "TD_Evt_ApplyCount :"; cout << *((int*)dataptr) << endl; } else { printStatus("TD_Evt_ApplyCount event failed with status", \ queryStatus); } } else { //Get Error Information cout << "Error occured during Application" << endl; ldConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } } else { //Get Error Information cout << "Error occured during EndAcquisition" << endl; ldConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } } else { //Get Error Information if ( ldStatus >= TD_Error ) { cout << "Load Error occured during Acquisition" << endl; ldConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } if ( expStatus >= TD_Error ) { cout << "Export Error occured during Acquisition" << endl; expConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } } } else { //Get Error Information cout << "Error occured during Load Initiate" << endl; ldConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } /* Terminate Drivers **********************************************/ cout << "\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; cout << " TERMINATE PHASE " << endl; cout << "%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%" << endl; ldStatus = ldConn -> Terminate(); expStatus = expConn -> Terminate(); if ( ldStatus >= TD_Error || expStatus >= TD_Error ) { /* Get Error Information */ if ( ldStatus >= TD_Error ) { cout << "Error occured during Load Terminate" << endl; ldConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } else { cout << "Load Terminated Successfully" << endl; } if ( expStatus >= TD_Error ) { cout << "Error occured during Export Terminate" << endl; expConn -> GetErrorInfo(&errorMsg, &errorType); if ( errorMsg != NULL ) { cout << errorMsg << endl; cout << "Type: " << errorType << endl; } else { cout << "No Error Info Available" << endl; } } else { cout << "Export Terminated Successfully" << endl; } } /********************************************** * Clean Up **********************************************/ cout << "Deleting objects" << endl; delete dmlGr; delete dynamicSchema; delete ldConn; delete expConn; free(user_name); free(user_password); free(target_table); free(target_table_export); free(tdp_id); free(log_table); free(error_table_1); free(error_table_2); free(ins_statement); free(sel_statement); free(msg); cout << "*** Export to Load Complete ***" << endl; cout <<"\n######################## OUTPUT END ####################" << endl; } } /*************************** Utility Functions *******************************/ /*********************************************** * Function prints string version of status code ************************************************/ void GenericDriver::printStatus(char* msg, int code) { cout << msg; switch(code) { case TD_Success: cout << "TD_Success"; break; case TD_SYNC_Barrier: cout << "TD_SYNC_Barrier"; break; case TD_SYNC_TELINFO: cout << "TD_SYNC_TELINFO"; break; case TD_END_Method: cout << "TD_END_Method"; break; case TD_Unavailable: cout << "TD_Unavailable"; break; case TD_Error: cout << "Error Code: TD_Error"; break; default: cout << "Error Code: " << code; break; } cout << endl; } /*************************************** * Function sets input file name ***************************************/ void GenericDriver::setInput(char* fname) { if( inputFileName != NULL ) { delete inputFileName; } inputFileName = strdup(fname); } /*************************************** * Function sets output file name ***************************************/ void GenericDriver::setOutput(char* fname) { if( outputFileName != NULL ) { delete outputFileName; } outputFileName = strdup(fname); } /*************************************** * Function reads in a row of data ***************************************/ int GenericDriver::GetRowData(char* rowBuffer, size_t rowLength) { unsigned short rlength; //Close File if End Signal Received if ( rowLength <= 0 ) { if ( inputStream != NULL ) { fclose(inputStream); } return -1; } //Open File if not already open if( inputStream == NULL ) { inputStream = fopen(inputFileName, "rb"); if( inputStream == NULL ) { cout << "Error opening file " << inputFileName << endl; return -1; } } //Read the lenght of each record..read first two bytes size_t recLen =fread(rowBuffer, 1, (size_t)2, inputStream); if ( recLen < 1 ) { if ( feof(inputStream) ) { cout << "End of file reached" << endl; if( inputStream != NULL ) { fclose(inputStream); } return -1; } else { /* return some read error here and exit */ cout << "read error" << endl; if( inputStream != NULL ) { fclose(inputStream); } return -1; } } //read row of data with size rlength memcpy(&rlength, rowBuffer, sizeof(unsigned short)); /* DR124305 */ size_t recoLen = fread(rowBuffer+2,1,(size_t)rlength+1,inputStream); return 0; } /***************************************************** * Function writes out a row of data in indicator mode ******************************************************/ int GenericDriver::WriteRowData(char* data, TD_Length dataLen) { if( data != NULL ) { //Open File if not already open if( outputStream == NULL ) { outputStream = fopen(outputFileName, "wb"); if( outputStream == NULL ) { cout << "Error opening file " << outputFileName << endl; return -1; } } unsigned long i = 0; unsigned short shortdataLen = (unsigned short)dataLen; char* rowBuffer = (char*)malloc(dataLen+20); memcpy(rowBuffer, &shortdataLen, 2); for(i = 0; i < dataLen; i++) { rowBuffer[i+2] = data[i]; } rowBuffer[i+2] = 0x0a; //fprintf (stream, "%s\n",rowBuffer+1); // account for indicator byte fwrite(rowBuffer, dataLen+3, 1, outputStream); free(rowBuffer); } else { if( outputStream != NULL ) { fclose(outputStream); } return -1; } return 0; } /***************************************************** * Function writes out a buffer of data in indicator mode ******************************************************/ int GenericDriver::WriteBufferData(char* data, TD_Length dataLen) { if( data != NULL ) { //Open File if not already open if( outputStream == NULL ) { outputStream = fopen(outputFileName, "wb"); if( outputStream == NULL ) { cout << "Error opening file " << outputFileName << endl; return -1; } } fwrite(data, dataLen, 1, outputStream); } else { { fclose(outputStream); } return -1; } return 0; }