Hadoop -- commodity recommendation based on item collaborative filtering algorithm

Collaborative filtering algorithm:

The article based collaborative filtering algorithm mainly has two steps:

1. Calculate the similarity between items: the similarity between items can be calculated according to the co-occurrence times, cosine angle and Euclidean distance.

2. Generate a recommendation list for users according to the similarity of items and users' historical purchase records

What items are finally recommended is determined by the degree of recommendation.


Core: find out the occurrence times of all goods purchased at the same time,
Now some users have purchased one of the products and recommend that the product be combined with another product


Package structure:

The first step of the project: simulate the data and establish the corresponding database table

//Create user table
create table s_user(
id int primary key auto_increment,
name varchar(20),
age int,
phone varchar(20)
);
insert into s_user values(10001,'jake',20,'15023453003'),(10002,'rose',22,'18923452343'),(10003,'tom',21,'15113453001'),(10004,'briup',22,'18823452456'),(10005,'kevin',24,'15925671003'),(10006,'patel',28,'15983432459');


//Create item table
create table s_product(
id int primary key auto_increment,
name varchar(20),
price double,
descrition varchar(100),
kc double
);
insert into s_product values(20001,'hadoop',89,'bigdata',1000),(20002,'hbase',36,'bigdata',110),(20003,'mysql',58,'bigdata',190),(20004,'sqoop',28,'bigdata',70),(20005,'flume',34,'bigdata',109),(20006,'kafka',29,'bigdata',78),(20007,'hive',31,'bigdata',83);


//Create order table
create table s_order(
id int primary key auto_increment,
name varchar(20),
order_date timestamp default current_timestamp on update current_timestamp,
user_id int references s_user(id)
);
insert into s_order(id,name,user_id) values(1,'briup_store',10001),(2,'briup_store',10002),(3,'briup_store',10003),(4,'briup_store',10004),(5,'briup_store',10005),(6,'briup_store',10006),(7,'briup_store',10007);

//Create a bridge table between the order table and the user table
create table order_line(
order_id int references s_order(id),
product_id int references s_product(id),
num double,
primary key(order_id,product_id)
);
insert into order_line values(1,20001,1),(1,20002,1),(1,20005,1),(1,20006,1),(1,20007,1),(2,20003,1),(2,20004,1),(2,20006,1),(3,20002,1),(3,20007,1),(4,20001,1),(4,20002,1),(4,20005,1),(4,20006,1),(5,20001,1),(6,20004,1),(6,20007,1);

//Create the final product recommendation result table
create table recommend(
uid int references s_user(id),
gid int references s_product(id),
nums double,
primary key(uid,gid)
);

Step 2: migrate the data in mysql to the hdfs distributed file system

sqoop import --connect jdbc:mysql://192.168.43.158:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1

Step 3: start programming

@Raw data: user	commodity id  Number of purchases	
		10001	20001	1
        10001	20002	1
        10001	20005	1
        10001	20006	1
        10001	20007	1
        10002	20003	1
        10002	20004	1
        10002	20006	1
        10003	20002	1
        10003	20007	1
        10004	20001	1
        10004	20002	1
        10004	20005	1
        10004	20006	1
        10005	20001	1
        10006	20004	1
        10006	20007	1

Step 1: calculate the list of goods purchased by the user

The data comes from the original data.

package com.briup.mr.one;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
//Step 1: calculate the list of goods purchased by the user
    //Result data: 10001 	 20001,20005,20006,20007,20002
public class UserBuyGoodsList{


    //Input: 10001 	 twenty thousand and one 	 one
    public static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> {
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");
            //Set the output key to user id
            outK.set(line[0]);
            outV.set(line[1]);
            context.write(outK, outV);
        }
    }

    public static class UserBuyGoodsListReducer extends Reducer<Text, Text, Text, Text> {
        private Text outV = new Text();

        Result data: 10001	20001,20005,20006,20007,20002
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();

            for (Text value : values) {
                //Splice string
                sb.append(value.toString() + ",");
            }

            //Remove the ',' at the end of the string
            sb.setLength(sb.length() - 1);

            outV.set(sb.toString());
            context.write(key, new Text(sb.toString()));
            outV.clear();
        }
    }

}

Result data:

        10001	20001,20005,20006,20007,20002
        10002	20006,20003,20004
        10003	20002,20007
        10004	20001,20002,20005,20006
        10005	20001
        10006	20004,20007

Step 2: calculate co-occurrence relationship of goods

Data source: calculation results in step 1

package com.briup.mr.two;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
//Calculate the co-occurrence relationship of commodities, that is, what are the expected results of the combination of two commodities: 20001 	 twenty thousand and one
//No reduce program is required
public class GoodsCooccurrenceList {
    //The data is read using the sequence file input format, and the read data is automatically divided based on keys and values
    public static class GoodsCooccurrenceListMapper extends Mapper<Text, Text, Text, NullWritable> {
        private StringBuffer sb = new StringBuffer();
        private Text outK = new Text();

        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split(",");

            //Each item id is combined in pairs
            for (String s : line) {
                for (String s1 : line) {
                    sb.append(s).append("\t").append(s1);

                    outK.set(sb.toString());
                    context.write(outK, NullWritable.get());
                    sb.setLength(0);
                    outK.clear();
                }
            }
        }
    }
}

Calculation results:

			20001	20001   
            20001	20002  
            20001	20005   
            20001	20006
            20001	20007
            20001	20001
            20001	20006
            20001	20005
            20001	20002
            20002	20007
            20002	20001
            20002	20005
            20002	20006
            20002	20007
            20002	20002
            20002	20006
            20002	20005
            ...		...

step3: calculate co-occurrence times of goods (co-occurrence matrix)

Data source: results of step 2

package com.briup.mr.three;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
public class GoodsCooccurrenceMatrix {
    //input data:
    // 20001	20001
    // 20001	20002
    //20001	20005
    public static class GoodsCooccurrenceMatrixMapper extends Mapper<Text, NullWritable, Text, Text> {
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split("\t");
            outK.set(line[0]);
            outV.set(line[1]);

            context.write(outK, outV);
        }
    }

    public static class GoodsCooccurrenceMatrixReducer extends Reducer<Text, Text, Text, Text> {
        //Define a map to store the output key information
        private Map<String, Integer> map = new HashMap<String, Integer>();
        private StringBuffer sb = new StringBuffer();
        private Text outV = new Text();

        //input data:
        //20001   [20001,20002,20005. . . ]
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            for (Text val : values) {
                if (map.containsKey(val.toString())){
                    //If the map contains this key
                    map.put(val.toString(),map.get(val.toString())+1);
                }else {
                    map.put(val.toString(),1);
                }
            }

            //Splice string
            for (Map.Entry<String, Integer> en : map.entrySet()) {
                sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
            }
            //Remove the "," at the end
            sb.setLength(sb.length()-1);
            outV.set(sb.toString());

            context.write(key,outV);

            sb.setLength(0);
            map.clear();
            outV.clear();
        }
    }
}

Calculation results:

          	20001	20001:3,20002:2,20005:2,20006:2,20007:1
            20002	20001:2,20002:3,20005:2,20006:2,20007:2
            20003	20003:1,20004:1,20006:1
            20004	20003:1,20004:2,20006:1,20007:1
            20005	20001:2,20002:2,20005:2,20006:2,20007:1
            20006	20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1
            20007	20001:1,20002:2,20004:1,20005:1,20006:1,20007:3

Step 4: calculate the user's purchase vector

Data source: the result of step 1 or the most original data.

package com.briup.mr.four;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */

//Calculate the user's purchase vector
public class UserBuyGoodsVector {
    /*
        Input data: 10001 	 twenty thousand and one 	 1 read the source file as the source data
     */
    public static class UserBuyGoodsVectorMapper extends Mapper<LongWritable, Text, Text, Text> {
        /*
            Commodity id is the key and user id is the value
         */
        private Text outK = new Text();
        private Text outV = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\t");

            outK.set(line[1]);
            outV.set(line[0]);

            context.write(outK, outV);
        }
    }

    public static class UserBuyGoodsVectorReducer extends Reducer<Text, Text, Text, Text> {
        /*
            Input: 20001 [1000110002.]
         */
        private Text outV = new Text();
        private Map<String, Integer> map = new HashMap<>();
        private StringBuffer sb = new StringBuffer();

        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            /*
                map The key output at the end is the commodity id and the value is the set of user IDs
                result:
                20001	10001:1,10004:1,10005:1
             */

            for (Text value : values) {
                if (map.containsKey(value.toString())) {
                    map.put(value.toString(), map.get(value.toString()) + 1);
                } else {
                    map.put(value.toString(), 1);
                }
            }

            for (Map.Entry<String, Integer> en : map.entrySet()) {
                sb.append(en.getKey()).append(":").append(en.getValue()).append(",");
            }
            sb.setLength(sb.length()-1);
            outV.set(sb.toString());
            context.write(key,outV);

            //Reset Data 
            sb.setLength(0);
            map.clear();
            outV.clear();

        }
    }
}

Result data:

			20001	10001:1,10004:1,10005:1
            20002	10001:1,10003:1,10004:1
            20003	10002:1
            20004	10002:1,10006:1
            20005	10001:1,10004:1
            20006	10001:1,10002:1,10004:1
            20007	10001:1,10003:1,10006:1

Step 5: the commodity co-occurrence matrix is multiplied by the user's purchase vector to form a temporary recommendation result.

Raw data: result data of steps 3 and 4

Thinking: the source of the document comes from two documents. The first document is the result of step 3 (co-occurrence matrix of items), and the second document is the result of step 4 (user's purchase vector). Therefore, in an MR program, you need to use two custom mappers to process them separately, and then define a custom Reducer to process the intermediate results of the two mappers.

GoodsBean class:

package com.briup.mr.five;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */
public class GoodsBean implements WritableComparable<GoodsBean> {
    private String g_id;    //Commodity id
    //A flag of 1 indicates that the data comes from the co-occurrence times of commodities (result in step 3)
    //A flag of 0 indicates that the data comes from the user's purchase vector (step 4 result)
    private int flag;

    public GoodsBean() {
    }

    public GoodsBean(String g_id, int flag) {
        this.g_id = g_id;
        this.flag = flag;
    }

    public String getG_id() {
        return g_id;
    }

    public void setG_id(String g_id) {
        this.g_id = g_id;
    }

    public int getFlag() {
        return flag;
    }

    public void setFlag(int flag) {
        this.flag = flag;
    }

    @Override
    public int compareTo(GoodsBean o) {
        int n = this.g_id.compareTo(o.g_id);
        if (n != 0) {
            return n;
        } else {
            //Put the data of commodity co-occurrence table in front
            return -(this.flag - o.flag);
        }
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(g_id);
        dataOutput.writeInt(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.g_id = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }
}

Mapredecupartition class goodpartitioner:

package com.briup.mr.five;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/28 20:07
 */
public class GoodsPartitioner extends Partitioner<GoodsBean, Text> {

    @Override
    public int getPartition(GoodsBean goodsBean, Text text, int numPartitions) {
        return Math.abs(Integer.parseInt(goodsBean.getG_id()) * 127) % numPartitions;
    }
}

mapreduce grouping class GoodsGroup:

package com.briup.mr.five;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/28 20:14
 */
public class GoodsGroup extends WritableComparator {
    public GoodsGroup() {
        super(GoodsBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //Group based on commodity id, and those with the same id are divided into one group
        GoodsBean o = (GoodsBean) a;
        GoodsBean o1 = (GoodsBean) b;

        return o.getG_id().compareTo(o1.getG_id());
    }
}

mapreduce class:

package com.briup.mr.five;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 */

/*
    step5:  The commodity co-occurrence matrix is multiplied by the user purchase vector to form a temporary recommendation result.
 */
public class MultiplyGoodsMatrixAndUserVector {

    /*
    Input data: result of step 3
        Item co-occurrence matrix: 20001 	 20005:2,20002:2,20001:3,20007:1,20006:2
     */

    public static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<Text, Text, GoodsBean, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //The map output data is: GoodsBean(20001, 1) 20005:220002:220001:320007:120006:2
            context.write(new GoodsBean(key.toString(), 1), value);
        }
    }

    /*
    input data:
        User purchase vector: 20001 	 10001:1,10004:1,10005:1
     */
    public static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<Text, Text, GoodsBean, Text> {
        @Override
        protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            //The map output data is: GoodsBean(20001, 0) 10001:110004:110005:1
            context.write(new GoodsBean(key.toString(), 0), value);
        }
    }

    /*
    Expected output data: 1000120001 	 two
     */
    public static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<GoodsBean, Text, Text, DoubleWritable> {
        /*
            The data entering reduce is:
                GoodsBean(20001,1)GoodsBean(20001,0)    20005:2,20002:2,20001:3,20007:1,20006:2 || 10001:1,10004:1,10005:1
         */
        @Override
        protected void reduce(GoodsBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iter = values.iterator();

            //Item received: 20005:220002:220001:320007:120006:2
            String[] goods = iter.next().toString().split(",");


            while (iter.hasNext()) {
                //Get the user's purchase vector: 10001:110004:110005:1
                String[] users = iter.next().toString().split(",");
//                System.out.println(Arrays.toString(users));
                for (String user : users) {
                    String[] uid_nums = user.split(":");

                    for (String good : goods) {
                        String[] gid_nums = good.split(":");
                        //sb output as key
                        StringBuffer sb = new StringBuffer();

                        sb.append(uid_nums[0]).append(",").append(gid_nums[0]);

                        context.write(new Text(sb.toString()), new DoubleWritable(Double.parseDouble(uid_nums[1]) * Double.parseDouble(gid_nums[1])));

                        sb.setLength(0);
                    }
                }
            }

        }
    }

}

Result data:

			10001,20001	2
			10001,20001	2
			10001,20001	3
			10001,20001	1
			10001,20001	2
			10001,20002	3
			10001,20002	2
			10001,20002	2
			10001,20002	2
			10001,20002	2
			...		...

Step 6: sum the recommended scattered results calculated in step 5.

Raw data: calculation results of step 5

package com.briup.mr.six;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:04
 */
//Step 6: sum the recommended scattered results calculated in step 5.
public class MakeSumForMultiplication {

    public static class MakeSumForMultiplicationMapper extends Mapper<Text, DoubleWritable, Text, DoubleWritable> {
        //MAP read in data: 1000620007 	 three
        @Override
        protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class MakeSumForMultiplicationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double sum = 0;

            for (DoubleWritable value : values) {
                sum += value.get();
            }
            context.write(key, new DoubleWritable(sum));
        }
    }
}

Result data:

			10001,20001	10
			10001,20002	11
			10001,20003	1
			10001,20004	2
			10001,20005	9
			10001,20006	10
			...		...

Step 7: de duplication of data, removing the product information purchased by the user from the recommendation results.

Data source:
1.FirstMapper processes the user's purchase list data.
2.SecondMapper processes the recommendation result data of the 6th.

javaBean class UserAndGoods:

package com.briup.mr.seven;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:24
 */
public class UserAndGoods implements WritableComparable<UserAndGoods> {
    private String userId;
    private String goodsId;
    //A flag of 1 indicates that the data comes from the source data
    //A flag of 0 indicates that the data comes from the result of step 6
    private int flag;

    public UserAndGoods() {
    }

    public UserAndGoods(String userId, String goodsId, int flag) {
        this.userId = userId;
        this.goodsId = goodsId;
        this.flag = flag;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getGoodsId() {
        return goodsId;
    }

    public void setGoodsId(String goodsId) {
        this.goodsId = goodsId;
    }

    @Override
    public int compareTo(UserAndGoods o) {

        int i = this.getUserId().compareTo(o.getUserId());
        //When user i is different
        if (i != 0) {
            return i;
        } else return this.getGoodsId().compareTo(o.getGoodsId());
    }

    @Override
    public String toString() {
        return "UserAndGoods{" +
                "userId='" + userId + '\'' +
                ", goodsId='" + goodsId + '\'' +
                ", flag=" + flag +
                '}';
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(userId);
        dataOutput.writeUTF(goodsId);
        dataOutput.writeInt(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.userId = dataInput.readUTF();
        this.goodsId = dataInput.readUTF();
        this.flag = dataInput.readInt();
    }
}

mapreduce class:

package com.briup.mr.seven;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 0:19
 */
//The data is de duplicated, and the product information purchased by the user is removed from the recommendation result.
public class DuplicateDataForResult {

    //FirstMapper processes the user's purchase list data
    public static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable, Text, UserAndGoods, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //A flag of 1 indicates that the data comes from the source data
            //A flag of 0 indicates that the data comes from the result of step 6
            String[] line = value.toString().split("\t");

            context.write(new UserAndGoods(line[0], line[1], 1), value);
        }
    }

    //The SecondMapper processes the recommendation result data of the 6th
    public static class DuplicateDataForResultSecondMapper extends Mapper<Text, DoubleWritable, UserAndGoods, Text> {
        @Override
        protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split(",");

            context.write(new UserAndGoods(line[0], line[1], 0), new Text(key.toString() + "\t" + value.get()));
        }
    }


    /*
        reduce Expected output data: 10001 	 twenty thousand and four 	 two
     */
    public static class DuplicateDataForResultReducer extends Reducer<UserAndGoods, Text, Text, NullWritable> {
        int i = 0;
        @Override
        protected void reduce(UserAndGoods key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Iterator<Text> iter = values.iterator();

            System.out.println((i++) + "--" + key);
            //The first element of the collection
            Text res = iter.next();
            System.out.println(res.toString());

            //If the collection has no next element, write it out directly
            if (!iter.hasNext()) {
                System.out.println("There is a next element");
                context.write(res, NullWritable.get());
            }

        }
    }
}

Calculation results:

			10001	20004	2
			10001	20003	1
			10002	20002	2
			10002	20007	2
			10002	20001	2
			10002	20005	2
			10003	20006	3
			10003	20005	3
			10003	20001	3
			10003	20004	1
			10004	20007	5
			10004	20004	1
			10004	20003	1
			10005	20006	2
			10005	20002	2
			...		...

step8: save recommendation results to MySQL database

Data source: calculation results in step 7

package com.briup.mr.eight;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/29 16:51
 */
public class DataInDB {

    public static class DataInDBMapper extends Mapper<Text, NullWritable, RecommendResultBean, NullWritable> {
        @Override
        protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException {
            String[] line = key.toString().split("\t");
            RecommendResultBean outK = new RecommendResultBean();
            outK.setNums(Double.parseDouble(line[1]));

            String[] split = line[0].split(",");
            outK.setUid(Integer.parseInt(split[0]));
            outK.setGid(Integer.parseInt(split[1]));

            context.write(outK, NullWritable.get());
        }
    }

    public static class DataInDBReducer extends Reducer<RecommendResultBean, DoubleWritable, RecommendResultBean, NullWritable> {
        @Override
        protected void reduce(RecommendResultBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }
}

result:

[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-boi4r1tr-1627699308225) (C: \ users \ ASUS \ appdata \ roaming \ typora \ typora user images \ image-20210729205026043. PNG)]

Step 9: build the job flow and submit the job to the cluster for operation

package com.briup.mr;

import com.briup.mr.eight.DataInDB;
import com.briup.mr.eight.RecommendResultBean;
import com.briup.mr.five.GoodsBean;
import com.briup.mr.five.GoodsGroup;
import com.briup.mr.five.GoodsPartitioner;
import com.briup.mr.five.MultiplyGoodsMatrixAndUserVector;
import com.briup.mr.four.UserBuyGoodsVector;
import com.briup.mr.one.UserBuyGoodsList;
import com.briup.mr.seven.DuplicateDataForResult;
import com.briup.mr.seven.UserAndGoods;
import com.briup.mr.six.MakeSumForMultiplication;
import com.briup.mr.three.GoodsCooccurrenceMatrix;
import com.briup.mr.two.GoodsCooccurrenceList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by Intellij IDEA.
 *
 * @author zhudezhong
 * @date 2021/7/27 16:36
 */
public class FinalJob extends Configured implements Tool {
    @Override
    public int run(String[] strings) throws Exception {
        //The first step is to calculate the job task of the result
        Configuration conf = getConf();

//        Path input = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ input.txt");
//        Path one_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out11");
//        Path two_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out22");
//        Path three_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out33");
//        Path four_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out44");
//        Path five_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out55");
//        Path six_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out66");
//        Path seven_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out77");


        Path input = new Path("/user/zhudz/goods/input.txt");
        Path one_output = new Path("/user/zhudz/goods/out11");
        Path two_output = new Path("/user/zhudz/goods/out22");
        Path three_output = new Path("/user/zhudz/goods/out33");
        Path four_output = new Path("/user/zhudz/goods/out44");
        Path five_output = new Path("/user/zhudz/goods/out55");
        Path six_output = new Path("/user/zhudz/goods/out66");
        Path seven_output = new Path("/user/zhudz/goods/out77");


        FileSystem fs = FileSystem.get(conf);

        //Judge whether the output path exists, and delete it if it exists

        if (fs.exists(one_output)) {
            fs.delete(one_output, true);
        }
        if (fs.exists(two_output)) {
            fs.delete(two_output, true);
        }
        if (fs.exists(three_output)) {
            fs.delete(three_output, true);
        }
        if (fs.exists(four_output)) {
            fs.delete(four_output, true);
        }
        if (fs.exists(five_output)) {
            fs.delete(five_output, true);
        }
        if (fs.exists(six_output)) {
            fs.delete(six_output, true);
        }
        if (fs.exists(seven_output)) {
            fs.delete(seven_output, true);
        }


        //Step 1: calculate the list of goods purchased by the user
        Job job = Job.getInstance(conf);
        job.setJarByClass(this.getClass());
        job.setJobName("Step1:Calculate the list of items purchased by the user");

        job.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        TextInputFormat.addInputPath(job, input);
        SequenceFileOutputFormat.setOutputPath(job, one_output);


        //Step 2: calculate the job task of the result
        Job job1 = Job.getInstance(conf);
        job1.setJarByClass(this.getClass());
        job1.setJobName("Step2:Calculate co-occurrence relationship of goods");

        job1.setMapperClass(GoodsCooccurrenceList.GoodsCooccurrenceListMapper.class);
        job1.setMapOutputKeyClass(Text.class);
        job1.setMapOutputValueClass(NullWritable.class);

        job1.setOutputKeyClass(Text.class);
        job1.setOutputValueClass(NullWritable.class);

        job1.setInputFormatClass(SequenceFileInputFormat.class);
        job1.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job1, one_output);
        SequenceFileOutputFormat.setOutputPath(job1, two_output);

        //Step 3: calculate the job task of the result
        Job job2 = Job.getInstance(conf);
        job2.setJarByClass(this.getClass());
        job2.setJobName("Step3:Number of CO cash commodities(Co-occurrence matrix)");

        job2.setMapperClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixMapper.class);
        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);


        job2.setReducerClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixReducer.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(Text.class);

        job2.setInputFormatClass(SequenceFileInputFormat.class);
        job2.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job2, two_output);
        SequenceFileOutputFormat.setOutputPath(job2, three_output);

        //Step 4 calculate user purchase vector
        Job job3 = Job.getInstance(conf);
        job3.setJarByClass(this.getClass());
        job3.setJobName("Step3:Calculate the user's purchase vector");

        job3.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class);
        job3.setMapOutputKeyClass(Text.class);
        job3.setMapOutputValueClass(Text.class);


        job3.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class);
        job3.setOutputKeyClass(Text.class);
        job3.setOutputValueClass(Text.class);

        //Read source file data
        job3.setInputFormatClass(TextInputFormat.class);
        job3.setOutputFormatClass(SequenceFileOutputFormat.class);

        TextInputFormat.addInputPath(job3, input);
        SequenceFileOutputFormat.setOutputPath(job3, four_output);

        //Step 5 calculate the product co-occurrence matrix multiplied by the user purchase vector
        Job job4 = Job.getInstance(conf);
        job4.setJarByClass(this.getClass());
        job4.setJobName("Step4:The commodity co-occurrence matrix is multiplied by the user purchase vector to form a temporary recommendation result");

        // Build multiple different map tasks
        //Co occurrence times of goods mapper
        MultipleInputs.addInputPath(job4,
                three_output,
                SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class);

        //User purchases vector mapper
        MultipleInputs.addInputPath(job4,
                four_output,
                SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class);

        job4.setMapOutputKeyClass(GoodsBean.class);
        job4.setMapOutputValueClass(Text.class);

        job4.setPartitionerClass(GoodsPartitioner.class);
        job4.setGroupingComparatorClass(GoodsGroup.class);

        job4.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class);
        job4.setOutputKeyClass(Text.class);
        job4.setOutputValueClass(DoubleWritable.class);

        job4.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileOutputFormat.setOutputPath(job4, five_output);

        //Step 6: sum the recommended scattered results calculated in step 5
        Job job5 = Job.getInstance(conf);
        job5.setJarByClass(this.getClass());
        job5.setJobName("Step6:Sum the recommended scattered results calculated in step 5");

        job5.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class);
        job5.setMapOutputKeyClass(Text.class);
        job5.setMapOutputValueClass(DoubleWritable.class);

        job5.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class);
        job5.setOutputKeyClass(Text.class);
        job5.setOutputValueClass(DoubleWritable.class);

        job5.setInputFormatClass(SequenceFileInputFormat.class);
        job5.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(job5, five_output);
        SequenceFileOutputFormat.setOutputPath(job5, six_output);


        //Step 7
        Job job6 = Job.getInstance(conf);
        job6.setJarByClass(this.getClass());
        job6.setJobName("Step7:The data is de duplicated, and the product information purchased by the user is removed from the recommendation result");

        // Build multiple different map tasks
        //FirstMapper processes the user's purchase list data.
        MultipleInputs.addInputPath(job6,
                input,
                TextInputFormat.class,
                DuplicateDataForResult.DuplicateDataForResultFirstMapper.class);

        //SecondMapper processes the recommendation result data of the 6th.
        MultipleInputs.addInputPath(job6,
                six_output,
                SequenceFileInputFormat.class,
                DuplicateDataForResult.DuplicateDataForResultSecondMapper.class);

        job6.setMapOutputKeyClass(UserAndGoods.class);
        job6.setMapOutputValueClass(Text.class);

        //Set grouping
        // job6.setGroupingComparatorClass(DuplicateDataGroup.class);

        job6.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class);
        job6.setOutputKeyClass(Text.class);
        job6.setOutputValueClass(NullWritable.class);

        job6.setOutputFormatClass(SequenceFileOutputFormat.class);

        SequenceFileOutputFormat.setOutputPath(job6, seven_output);


        //Step 8: save the recommendation results to the MySQL database. The data comes from step 7
        Job job7 = Job.getInstance(conf);
        job7.setJarByClass(this.getClass());
        job7.setJobName("Step8:Save recommendation results to MySQL In database");

        DBConfiguration.configureDB(job7.getConfiguration(), "com.mysql.jdbc.Driver",
                "jdbc:mysql://192.168.10.131/briup", "root", "root");
        DBOutputFormat.setOutput(job7, "recommend", "uid", "gid", "nums");

        job7.setMapperClass(DataInDB.DataInDBMapper.class);
        job7.setMapOutputKeyClass(RecommendResultBean.class);
        job7.setMapOutputValueClass(NullWritable.class);

        job7.setReducerClass(DataInDB.DataInDBReducer.class);
        job7.setOutputKeyClass(RecommendResultBean.class);
        job7.setOutputValueClass(NullWritable.class);

        job7.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.addInputPath(job7, seven_output);
        job7.setOutputFormatClass(DBOutputFormat.class);


        //final: build job flow
        ControlledJob contro_job = new ControlledJob(conf);
        contro_job.setJob(job);

        ControlledJob contro_job1 = new ControlledJob(conf);
        contro_job1.setJob(job1);
        contro_job1.addDependingJob(contro_job);

        ControlledJob contro_job2 = new ControlledJob(conf);
        contro_job2.setJob(job2);
        contro_job2.addDependingJob(contro_job1);

        ControlledJob contro_job3 = new ControlledJob(conf);
        contro_job3.setJob(job3);

        ControlledJob contro_job4 = new ControlledJob(conf);
        contro_job4.setJob(job4);
        contro_job4.addDependingJob(contro_job2);
        contro_job4.addDependingJob(contro_job3);

        ControlledJob contro_job5 = new ControlledJob(conf);
        contro_job5.setJob(job5);
        contro_job5.addDependingJob(contro_job4);

        ControlledJob contro_job6 = new ControlledJob(conf);
        contro_job6.setJob(job6);
        contro_job6.addDependingJob(contro_job5);

        ControlledJob contro_job7 = new ControlledJob(conf);
        contro_job7.setJob(job7);
        contro_job7.addDependingJob(contro_job6);

        JobControl jobs = new JobControl("goods_recommends");
        jobs.addJob(contro_job);
        jobs.addJob(contro_job1);
        jobs.addJob(contro_job2);
        jobs.addJob(contro_job3);
        jobs.addJob(contro_job4);
        jobs.addJob(contro_job5);
        jobs.addJob(contro_job6);
        jobs.addJob(contro_job6);
        jobs.addJob(contro_job7);

        Thread t = new Thread(jobs);
        t.start();

        //Print log
        while (true) {
            for (ControlledJob c : jobs.getRunningJobList()) {
                c.getJob().monitorAndPrintJob();
            }
            if (jobs.allFinished()) break;
        }

        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.exit(new ToolRunner().run(new FinalJob(), args));
//        new ToolRunner().run(new FinalJob(), args);
    }

}

Step 9: write the shell task script on the cluster and set the scheduled execution of the task

1. Write a script to migrate the data in mysql to the hdfs file system

sudo vi mysqlToHDFS.sh
#Add execution permission when finished
sudo chmod +x mysqlToHDFS.sh
#!bin/bash
sqoop import --connect jdbc:mysql://localhost:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1

2. Write a script to submit the task of the change project to the cluster

sudo vi recomendMR.sh
#Add execution permission when finished
sudo chmod +x recomendMR.sh
#!bin/bash
yarn jar /home/hdfs/GoodsRecommend-1.0-SNAPSHOT.jar com.briup.mr.FinalJob

3. Set scheduled tasks

crontab -e

#Execute the task of migrating data from mysql to hdfs file system at 7 a.m. every Monday
0 7 * * 2 sh ~/bin/mysqlToHDFS.sh

#At 7:30 a.m. every Monday, execute the task of submitting the change project to the cluster
30 7 * * 2 sh ~/bin/recomendMR.sh

Project source code: https://gitee.com/zhu-dezhong/recomendGoods

Tags: Java Hadoop mapreduce

Posted by raquelzinha on Mon, 03 Jan 2022 13:25:36 +1030