找到你要的答案

Q:Condition on map reduce counters to control the map output

Q:条件在地图上减少计数器来控制地图输出

Is there any chance to put a condition on user defined java counters at mapper level control the mapper output ??

       Long l = context.getCounter(Counters.COUNT).getValue();

        if(5L >= l) {
            context.getCounter(Counters.COUNT).increment(1);
            context.write((LongWritable)key, value);
        } else {
            System.out.println("MAP ELSE");
            return;
        }

getting more then five records input to the reducer. Is there any chance to control this.???

我有机会把一个条件,用户定义的java柜台制图水平控制的制图输出??

       Long l = context.getCounter(Counters.COUNT).getValue();

        if(5L >= l) {
            context.getCounter(Counters.COUNT).increment(1);
            context.write((LongWritable)key, value);
        } else {
            System.out.println("MAP ELSE");
            return;
        }

getting more then five records input to the reducer. Is there any chance to control this.???

answer1: 回答1:

You cannot do that, if your input file has 3 splits then you will have 3 mappers running. Each mapper would have its individual count value(depends on the logic how to increment the count value) and that would only be known in the reduce side once all the mappers complete after the shuffle phase.

If you want to restrict your map output. Then have a single reducer job.setNumReduceTasks(1) and restrict the output from reducer. Something like this.

public static class WLReducer2 extends
        Reducer<IntWritable, Text, Text, IntWritable> {
    int count=0;
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {

        for (Text x : values) {
            if (count < 5)
            context.write(key, x);
            count++;
        }

    };
}

If you want to get the counter value in reduce side. You could add that to the reduce setup method.

 @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }

你不能这样做,如果你的输入文件有3分裂,那么你将有3个程序运行。每个映射都有它的个体数(取决于逻辑如何增量值),只会在减少副一旦所有映射完成洗牌阶段后称。

如果要限制地图输出。然后有一个减速器的工作。setnumreducetasks(1)和限制输出减速器。像这样的东西。

public static class WLReducer2 extends
        Reducer<IntWritable, Text, Text, IntWritable> {
    int count=0;
    @Override
    protected void reduce(IntWritable key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {

        for (Text x : values) {
            if (count < 5)
            context.write(key, x);
            count++;
        }

    };
}

如果您想得到计数器值在减少侧。您可以添加到还原设置方法。

 @Override
    public void setup(Context context) throws IOException, InterruptedException{
        Configuration conf = context.getConfiguration();
        Cluster cluster = new Cluster(conf);
        Job currentJob = cluster.getJob(context.getJobID());
        mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue();  
    }
java  hadoop  mapreduce  counter