package examples;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import com.teradata.fnc.TeradataType;
import com.teradata.fnc.operator.MetaData;
import com.teradata.fnc.operator.TableOperator;
import com.teradata.fnc.operator.TeradataResultSet;
import com.teradata.fnc.runtime.ColumnDefinition;
import com.teradata.fnc.runtime.RuntimeContract;
import com.teradata.fnc.value.MismatchException;
import com.teradata.fnc.value.RangeException;
import com.teradata.fnc.value.Value;
public class Sessionize implements TableOperator{
public int contract(RuntimeContract contract, ResultSet[] rsin, ResultSet[] arg2)
throws SQLException {
/* Create the number of output columns plus additional SessionID. */
MetaData iCols = ((TeradataResultSet)rsin[0]).getTeradataMetaData();
ColumnDefinition OutCols[] = new ColumnDefinition[iCols.getColumnCount() + 1];
int col = 0;
/* Copy input columns to output columns. */
for (col=0;col<iCols.getColumnCount();col++) {
OutCols[col] = new ColumnDefinition(iCols.getColumnName(col+1), iCols.getTeradataColumnType(col+1));
OutCols[col].setDisplayLength(iCols.getColumnDisplaySize(col+1));
switch (TeradataType.get(iCols.getTeradataColumnType(col+1))) {
case VARCHAR_DT:
case CHAR_DT:
OutCols[col].setCharset(iCols.getPrecision(col+1));
break;
case VARBYTE_DT:
OutCols[col].setCharset(iCols.getPrecision(col+1));
OutCols[col].setDisplayLength(iCols.getColumnDisplaySize(col+1));
break;
case TIME_DT:
case TIMESTAMP_DT:
case TIME_WTZ_DT:
case TIMESTAMP_WTZ_DT:
OutCols[col].setPrecision(iCols.getPrecision(col+1));
break;
default:
OutCols[col].setPrecision(iCols.getPrecision(col+1));
OutCols[col].setScale(iCols.getScale(col+1));
break;
}
}
/* Add SessionID column. */
OutCols[col] = new ColumnDefinition("SessionID", TeradataType.INTEGER_DT);
OutCols[col++].setDisplayLength(4);
/* Set output columns and complete the contract. */
contract.setOutputInfo(0, OutCols);
contract.complete();
return 1;
} /* contract */
public void execute(RuntimeContract contract, ResultSet[] rsin, ResultSet[] rsout)
throws SQLException {
try {
int currentSessionId = 0;
long lastTime = 0;
String TimeColumn = (String)((Value)contract.getInputInfo().getCustom().get("TimeCol")).getObject();
byte timevalue = ((Value)contract.getInputInfo().getCustom().get("timeout")).getByte();
long window = timevalue*1000;
int colcount = rsin[0].getMetaData().getColumnCount();
/* Copy user data adding a sessionization column. */
while (rsin[0].next()) {
/* Determine if time of this click is more than the window after the last. */
Timestamp currentTime = (Timestamp) rsin[0].getObject(TimeColumn);
if ( currentTime.getTime() > lastTime + window) {
currentSessionId++;
}
for(int i=1;i<=colcount;i++) {
Object o = rsin[0].getObject(i);
if(rsin[0].wasNull())
rsout[0].updateObject(i, null);
else {
rsout[0].updateObject(i, o);
}
}
/* Add session info. */
rsout[0].updateObject(colcount+1, currentSessionId);
rsout[0].insertRow();
lastTime = currentTime.getTime();
}
} catch (MistmatchException e1) {
throw new SQLException("T0002", "Invalid timeout value numeric");
} catch (RangeException e1) {
throw new SQLException("T0002", "Invalid timeout value 1-30");
}
} /* execute */
}