package com;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class Kmeans {
// static List
// static int K;
// static int dataBeginIndex;
public static class KmeansMapper extends
Mapper
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
List
.getConfiguration().get("centersPath"));
int dataBeginIndex = Integer.parseInt(context.getConfiguration()
.get("dtBegIdxPath"));
int K = Integer.parseInt(context.getConfiguration().get("KPath"));
double minDistance = 99999999;
int centerIndex = K;
for (int i = 0; i < K; i++) {
double currentDistance = 0;
for (int j = dataBeginIndex; j < fields.length; j++) {
double t1 = Math.abs(centers.get(i).get(j));
double t2 = Math.abs(Double.parseDouble(fields[j]));
currentDistance += Math.pow((t1 - t2) / (t1 + t2), 2);
}
Help.debug(currentDistance, "currentDistance");
if (minDistance > currentDistance) {
minDistance = currentDistance;
centerIndex = i;
}
}
IntWritable centerId = new IntWritable(centerIndex+1);
Text tValue = new Text();
tValue.set(value);
context.write(centerId, tValue);
}
}
public static class KmeansReducer extends
Reducer
public void reduce(IntWritable key, Iterable
Context context) throws IOException, InterruptedException { List
String tmpResult = "";
for (Text val : values) {
String line = val.toString();
String[] fields = line.split(",");
List
for (int i = 0; i < fields.length; i++) {
tmpList.add(Double.parseDouble(fields[i]));
}
helpList.add((ArrayList
}
// System.out.println(helpList.size());
// for(int i=0;i // System.out.println(helpList.get(i)); for (int i = 0; i < helpList.get(0).size(); i++) { double sum = 0; for (int j = 0; j < helpList.size(); j++) { sum += helpList.get(j).get(i); } double t = sum / helpList.size(); if (i == 0) tmpResult += t; else tmpResult += "," + t; } Text result = new Text(); result.set(tmpResult); int tmpKey = Integer.parseInt(key.toString()); context.write(new IntWritable(tmpKey), result); } } static void runKmeans(String[] args, boolean isReduce) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 7) { System.err .println("Usage: Kmeans System.exit(2); } conf.setStrings("centersPath", otherArgs[3]); conf.setStrings("dtBegIdxPath", otherArgs[5]); conf.setStrings("KPath", otherArgs[6]); Job job = new Job(conf, "kmeans"); job.setJarByClass(Kmeans.class); job.setMapperClass(KmeansMapper.class); job.setNumReduceTasks(Integer.parseInt(args[6])); // 判断是否需要执行Reduce if (isReduce) { job.setReducerClass(KmeansReducer.class); } job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // delete last result Help.deleteLastResult(otherArgs[1]); // System.exit(job.waitForCompletion(true)?0:1); job.waitForCompletion(true); } /** * * @param in * - args[0] out - args[1] localOriginalCentersPath - args[2] * oldCentersPath - args[3] newCentersPath - args[4] * dataBeginIndex - args[5] K - args[6] * @throws Exception */ public static void main(String[] args) throws Exception { Help.deleteLastResult(args[3]); Help.copyOriginalCenters(args[2], args[3]); int count=1; // runKmeans(args, true); while (true) { System.out.println("迭代的轮次:"+count++); runKmeans(args, true); if (Help.isFinished(args[3], args[4], args[6], args[5], 0.0)) { runKmeans(args, false); break; } if(count==20) { runKmeans(args, false); break; } } } } 输入输出路径:hdfs://master:9000/kmeans/input/wine.txt hdfs://master:9000/kmeans/out /home/hadoop/oldCenterData.txt hdfs://master:9000/kmeans/oldCenter hdfs://master:9000/kmeans/out/part-r-0000 1 3