/** The Kmeans2 Class defines a new wrapper function that uses the Aster-Spark * APIand implements the Spark MLlib K-means clustering algorithm. * The Kmeans2 class overrides the run method and implements the Spark K-means * MLlib function. * */ class Kmeans2 (args : Array[String], name: String, mstr:String = null ) extends AsterSparkFunctionDR (args, name, mstr) { /** * The run method is overridden to implement the Spark MLlib K-means function. * @param input The input RDD that reads data from the source. The input * RDD is of type RDD[DataRow]. * @param sparkFuncParams String representing the parameters specific to the * function we are the user is implementing. In this example sparkFuncParams * is a numeric values representing the sampling ratio. * @return The result RDD is an RDD containing all the input data and the * predicted value which in this case the cluster number. */ override def run(input: RDD[DataRow], sparkFunctParams: String ): RDD[DataRow] = { //sample the input data to build the model val sampleFraction = sparkFunctParams.trim().toDouble val sampleSet = input.sample(false, sampleFraction, 10) val vrdd = sampleSet.map { rowArray => getDenseVector(rowArray.tail)} val model = new KMeans().run(vrdd) //model is built on the sampled data. now predict using the input data set val srdd = input.map( rowArray => rowArray :+ model.predict(getDenseVector( rowArray.tail)).toDataField) srdd } }
Query using the RunOnSpark function:
SELECT cluster, count(*), min(v1), max(v1), min(v2), max(v2) FROM RunOnSpark ( ON (SELECT i , v1 , v2 FROM points) SPARKCODE ('com.teradata.aster.functions.Kmeans2 0.5') OUTPUTS ('key int', 'v1 double precision', 'v2 double precision', 'cluster int') SPARK_CLUSTER_ID ('spark1-yarn-socket') ) sp GROUP BY cluster ORDER BY cluster;