文档库 最新最全的文档下载
当前位置:文档库 › 基于MapReduce的Kmeans算法代码及其使用

基于MapReduce的Kmeans算法代码及其使用

package com;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class Kmeans {

// static List> centers ;

// static int K;

// static int dataBeginIndex;

public static class KmeansMapper extends

Mapper {

public void map(Object key, Text value, Context context)

throws IOException, InterruptedException {

String line = value.toString();

String[] fields = line.split(",");

List> centers = Help.getOldCenters(context

.getConfiguration().get("centersPath"));

int dataBeginIndex = Integer.parseInt(context.getConfiguration()

.get("dtBegIdxPath"));

int K = Integer.parseInt(context.getConfiguration().get("KPath"));

double minDistance = 99999999;

int centerIndex = K;

for (int i = 0; i < K; i++) {

double currentDistance = 0;

for (int j = dataBeginIndex; j < fields.length; j++) {

double t1 = Math.abs(centers.get(i).get(j));

double t2 = Math.abs(Double.parseDouble(fields[j]));

currentDistance += Math.pow((t1 - t2) / (t1 + t2), 2);

}

Help.debug(currentDistance, "currentDistance");

if (minDistance > currentDistance) {

minDistance = currentDistance;

centerIndex = i;

}

}

IntWritable centerId = new IntWritable(centerIndex+1);

Text tValue = new Text();

tValue.set(value);

context.write(centerId, tValue);

}

}

public static class KmeansReducer extends

Reducer {

public void reduce(IntWritable key, Iterable values,

Context context) throws IOException, InterruptedException { List> helpList = new ArrayList>();

String tmpResult = "";

for (Text val : values) {

String line = val.toString();

String[] fields = line.split(",");

List tmpList = new ArrayList();

for (int i = 0; i < fields.length; i++) {

tmpList.add(Double.parseDouble(fields[i]));

}

helpList.add((ArrayList) tmpList);

}

// System.out.println(helpList.size());

// for(int i=0;i

// System.out.println(helpList.get(i));

for (int i = 0; i < helpList.get(0).size(); i++) {

double sum = 0;

for (int j = 0; j < helpList.size(); j++) {

sum += helpList.get(j).get(i);

}

double t = sum / helpList.size();

if (i == 0)

tmpResult += t;

else

tmpResult += "," + t;

}

Text result = new Text();

result.set(tmpResult);

int tmpKey = Integer.parseInt(key.toString());

context.write(new IntWritable(tmpKey), result);

}

}

static void runKmeans(String[] args, boolean isReduce) throws IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args)

.getRemainingArgs();

if (otherArgs.length != 7) {

System.err

.println("Usage: Kmeans ");

System.exit(2);

}

conf.setStrings("centersPath", otherArgs[3]);

conf.setStrings("dtBegIdxPath", otherArgs[5]);

conf.setStrings("KPath", otherArgs[6]);

Job job = new Job(conf, "kmeans");

job.setJarByClass(Kmeans.class);

job.setMapperClass(KmeansMapper.class);

job.setNumReduceTasks(Integer.parseInt(args[6]));

// 判断是否需要执行Reduce

if (isReduce) {

job.setReducerClass(KmeansReducer.class);

}

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

// delete last result

Help.deleteLastResult(otherArgs[1]);

// System.exit(job.waitForCompletion(true)?0:1);

job.waitForCompletion(true);

}

/**

*

* @param in

* - args[0] out - args[1] localOriginalCentersPath - args[2]

* oldCentersPath - args[3] newCentersPath - args[4]

* dataBeginIndex - args[5] K - args[6]

* @throws Exception

*/

public static void main(String[] args) throws Exception {

Help.deleteLastResult(args[3]);

Help.copyOriginalCenters(args[2], args[3]);

int count=1;

// runKmeans(args, true);

while (true) {

System.out.println("迭代的轮次:"+count++);

runKmeans(args, true);

if (Help.isFinished(args[3], args[4], args[6], args[5], 0.0)) {

runKmeans(args, false);

break;

}

if(count==20)

{

runKmeans(args, false);

break;

}

}

}

}

输入输出路径:hdfs://master:9000/kmeans/input/wine.txt hdfs://master:9000/kmeans/out /home/hadoop/oldCenterData.txt hdfs://master:9000/kmeans/oldCenter hdfs://master:9000/kmeans/out/part-r-0000 1 3

相关文档
相关文档 最新文档