Collaborative filtering algorithm:
The article based collaborative filtering algorithm mainly has two steps:
1. Calculate the similarity between items: the similarity between items can be calculated according to the co-occurrence times, cosine angle and Euclidean distance.
2. Generate a recommendation list for users according to the similarity of items and users' historical purchase records
What items are finally recommended is determined by the degree of recommendation.
Core: find out the occurrence times of all goods purchased at the same time,
Now some users have purchased one of the products and recommend that the product be combined with another product
Package structure:
The first step of the project: simulate the data and establish the corresponding database table
//Create user table create table s_user( id int primary key auto_increment, name varchar(20), age int, phone varchar(20) ); insert into s_user values(10001,'jake',20,'15023453003'),(10002,'rose',22,'18923452343'),(10003,'tom',21,'15113453001'),(10004,'briup',22,'18823452456'),(10005,'kevin',24,'15925671003'),(10006,'patel',28,'15983432459'); //Create item table create table s_product( id int primary key auto_increment, name varchar(20), price double, descrition varchar(100), kc double ); insert into s_product values(20001,'hadoop',89,'bigdata',1000),(20002,'hbase',36,'bigdata',110),(20003,'mysql',58,'bigdata',190),(20004,'sqoop',28,'bigdata',70),(20005,'flume',34,'bigdata',109),(20006,'kafka',29,'bigdata',78),(20007,'hive',31,'bigdata',83); //Create order table create table s_order( id int primary key auto_increment, name varchar(20), order_date timestamp default current_timestamp on update current_timestamp, user_id int references s_user(id) ); insert into s_order(id,name,user_id) values(1,'briup_store',10001),(2,'briup_store',10002),(3,'briup_store',10003),(4,'briup_store',10004),(5,'briup_store',10005),(6,'briup_store',10006),(7,'briup_store',10007); //Create a bridge table between the order table and the user table create table order_line( order_id int references s_order(id), product_id int references s_product(id), num double, primary key(order_id,product_id) ); insert into order_line values(1,20001,1),(1,20002,1),(1,20005,1),(1,20006,1),(1,20007,1),(2,20003,1),(2,20004,1),(2,20006,1),(3,20002,1),(3,20007,1),(4,20001,1),(4,20002,1),(4,20005,1),(4,20006,1),(5,20001,1),(6,20004,1),(6,20007,1); //Create the final product recommendation result table create table recommend( uid int references s_user(id), gid int references s_product(id), nums double, primary key(uid,gid) );
Step 2: migrate the data in mysql to the hdfs distributed file system
sqoop import --connect jdbc:mysql://192.168.43.158:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
Step 3: start programming
@Raw data: user commodity id Number of purchases 10001 20001 1 10001 20002 1 10001 20005 1 10001 20006 1 10001 20007 1 10002 20003 1 10002 20004 1 10002 20006 1 10003 20002 1 10003 20007 1 10004 20001 1 10004 20002 1 10004 20005 1 10004 20006 1 10005 20001 1 10006 20004 1 10006 20007 1
Step 1: calculate the list of goods purchased by the user
The data comes from the original data.
package com.briup.mr.one; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ //Step 1: calculate the list of goods purchased by the user //Result data: 10001 20001,20005,20006,20007,20002 public class UserBuyGoodsList{ //Input: 10001 twenty thousand and one one public static class UserBuyGoodsListMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); //Set the output key to user id outK.set(line[0]); outV.set(line[1]); context.write(outK, outV); } } public static class UserBuyGoodsListReducer extends Reducer<Text, Text, Text, Text> { private Text outV = new Text(); Result data: 10001 20001,20005,20006,20007,20002 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer(); for (Text value : values) { //Splice string sb.append(value.toString() + ","); } //Remove the ',' at the end of the string sb.setLength(sb.length() - 1); outV.set(sb.toString()); context.write(key, new Text(sb.toString())); outV.clear(); } } }
Result data:
10001 20001,20005,20006,20007,20002 10002 20006,20003,20004 10003 20002,20007 10004 20001,20002,20005,20006 10005 20001 10006 20004,20007
Step 2: calculate co-occurrence relationship of goods
Data source: calculation results in step 1
package com.briup.mr.two; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ //Calculate the co-occurrence relationship of commodities, that is, what are the expected results of the combination of two commodities: 20001 twenty thousand and one //No reduce program is required public class GoodsCooccurrenceList { //The data is read using the sequence file input format, and the read data is automatically divided based on keys and values public static class GoodsCooccurrenceListMapper extends Mapper<Text, Text, Text, NullWritable> { private StringBuffer sb = new StringBuffer(); private Text outK = new Text(); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split(","); //Each item id is combined in pairs for (String s : line) { for (String s1 : line) { sb.append(s).append("\t").append(s1); outK.set(sb.toString()); context.write(outK, NullWritable.get()); sb.setLength(0); outK.clear(); } } } } }
Calculation results:
20001 20001 20001 20002 20001 20005 20001 20006 20001 20007 20001 20001 20001 20006 20001 20005 20001 20002 20002 20007 20002 20001 20002 20005 20002 20006 20002 20007 20002 20002 20002 20006 20002 20005 ... ...
step3: calculate co-occurrence times of goods (co-occurrence matrix)
Data source: results of step 2
package com.briup.mr.three; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by Intellij IDEA. * * @author zhudezhong */ public class GoodsCooccurrenceMatrix { //input data: // 20001 20001 // 20001 20002 //20001 20005 public static class GoodsCooccurrenceMatrixMapper extends Mapper<Text, NullWritable, Text, Text> { private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split("\t"); outK.set(line[0]); outV.set(line[1]); context.write(outK, outV); } } public static class GoodsCooccurrenceMatrixReducer extends Reducer<Text, Text, Text, Text> { //Define a map to store the output key information private Map<String, Integer> map = new HashMap<String, Integer>(); private StringBuffer sb = new StringBuffer(); private Text outV = new Text(); //input data: //20001 [20001,20002,20005. . . ] @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { if (map.containsKey(val.toString())){ //If the map contains this key map.put(val.toString(),map.get(val.toString())+1); }else { map.put(val.toString(),1); } } //Splice string for (Map.Entry<String, Integer> en : map.entrySet()) { sb.append(en.getKey()).append(":").append(en.getValue()).append(","); } //Remove the "," at the end sb.setLength(sb.length()-1); outV.set(sb.toString()); context.write(key,outV); sb.setLength(0); map.clear(); outV.clear(); } } }
Calculation results:
20001 20001:3,20002:2,20005:2,20006:2,20007:1 20002 20001:2,20002:3,20005:2,20006:2,20007:2 20003 20003:1,20004:1,20006:1 20004 20003:1,20004:2,20006:1,20007:1 20005 20001:2,20002:2,20005:2,20006:2,20007:1 20006 20001:2,20002:2,20003:1,20004:1,20005:2,20006:3,20007:1 20007 20001:1,20002:2,20004:1,20005:1,20006:1,20007:3
Step 4: calculate the user's purchase vector
Data source: the result of step 1 or the most original data.
package com.briup.mr.four; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by Intellij IDEA. * * @author zhudezhong */ //Calculate the user's purchase vector public class UserBuyGoodsVector { /* Input data: 10001 twenty thousand and one 1 read the source file as the source data */ public static class UserBuyGoodsVectorMapper extends Mapper<LongWritable, Text, Text, Text> { /* Commodity id is the key and user id is the value */ private Text outK = new Text(); private Text outV = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\t"); outK.set(line[1]); outV.set(line[0]); context.write(outK, outV); } } public static class UserBuyGoodsVectorReducer extends Reducer<Text, Text, Text, Text> { /* Input: 20001 [1000110002.] */ private Text outV = new Text(); private Map<String, Integer> map = new HashMap<>(); private StringBuffer sb = new StringBuffer(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { /* map The key output at the end is the commodity id and the value is the set of user IDs result: 20001 10001:1,10004:1,10005:1 */ for (Text value : values) { if (map.containsKey(value.toString())) { map.put(value.toString(), map.get(value.toString()) + 1); } else { map.put(value.toString(), 1); } } for (Map.Entry<String, Integer> en : map.entrySet()) { sb.append(en.getKey()).append(":").append(en.getValue()).append(","); } sb.setLength(sb.length()-1); outV.set(sb.toString()); context.write(key,outV); //Reset Data sb.setLength(0); map.clear(); outV.clear(); } } }
Result data:
20001 10001:1,10004:1,10005:1 20002 10001:1,10003:1,10004:1 20003 10002:1 20004 10002:1,10006:1 20005 10001:1,10004:1 20006 10001:1,10002:1,10004:1 20007 10001:1,10003:1,10006:1
Step 5: the commodity co-occurrence matrix is multiplied by the user's purchase vector to form a temporary recommendation result.
Raw data: result data of steps 3 and 4
Thinking: the source of the document comes from two documents. The first document is the result of step 3 (co-occurrence matrix of items), and the second document is the result of step 4 (user's purchase vector). Therefore, in an MR program, you need to use two custom mappers to process them separately, and then define a custom Reducer to process the intermediate results of the two mappers.
GoodsBean class:
package com.briup.mr.five; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong */ public class GoodsBean implements WritableComparable<GoodsBean> { private String g_id; //Commodity id //A flag of 1 indicates that the data comes from the co-occurrence times of commodities (result in step 3) //A flag of 0 indicates that the data comes from the user's purchase vector (step 4 result) private int flag; public GoodsBean() { } public GoodsBean(String g_id, int flag) { this.g_id = g_id; this.flag = flag; } public String getG_id() { return g_id; } public void setG_id(String g_id) { this.g_id = g_id; } public int getFlag() { return flag; } public void setFlag(int flag) { this.flag = flag; } @Override public int compareTo(GoodsBean o) { int n = this.g_id.compareTo(o.g_id); if (n != 0) { return n; } else { //Put the data of commodity co-occurrence table in front return -(this.flag - o.flag); } } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(g_id); dataOutput.writeInt(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.g_id = dataInput.readUTF(); this.flag = dataInput.readInt(); } }
Mapredecupartition class goodpartitioner:
package com.briup.mr.five; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Partitioner; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/28 20:07 */ public class GoodsPartitioner extends Partitioner<GoodsBean, Text> { @Override public int getPartition(GoodsBean goodsBean, Text text, int numPartitions) { return Math.abs(Integer.parseInt(goodsBean.getG_id()) * 127) % numPartitions; } }
mapreduce grouping class GoodsGroup:
package com.briup.mr.five; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/28 20:14 */ public class GoodsGroup extends WritableComparator { public GoodsGroup() { super(GoodsBean.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { //Group based on commodity id, and those with the same id are divided into one group GoodsBean o = (GoodsBean) a; GoodsBean o1 = (GoodsBean) b; return o.getG_id().compareTo(o1.getG_id()); } }
mapreduce class:
package com.briup.mr.five; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.*; /** * Created by Intellij IDEA. * * @author zhudezhong */ /* step5: The commodity co-occurrence matrix is multiplied by the user purchase vector to form a temporary recommendation result. */ public class MultiplyGoodsMatrixAndUserVector { /* Input data: result of step 3 Item co-occurrence matrix: 20001 20005:2,20002:2,20001:3,20007:1,20006:2 */ public static class MultiplyGoodsMatrixAndUserVectorFirstMapper extends Mapper<Text, Text, GoodsBean, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //The map output data is: GoodsBean(20001, 1) 20005:220002:220001:320007:120006:2 context.write(new GoodsBean(key.toString(), 1), value); } } /* input data: User purchase vector: 20001 10001:1,10004:1,10005:1 */ public static class MultiplyGoodsMatrixAndUserVectorSecondMapper extends Mapper<Text, Text, GoodsBean, Text> { @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { //The map output data is: GoodsBean(20001, 0) 10001:110004:110005:1 context.write(new GoodsBean(key.toString(), 0), value); } } /* Expected output data: 1000120001 two */ public static class MultiplyGoodsMatrixAndUserVectorReducer extends Reducer<GoodsBean, Text, Text, DoubleWritable> { /* The data entering reduce is: GoodsBean(20001,1)GoodsBean(20001,0) 20005:2,20002:2,20001:3,20007:1,20006:2 || 10001:1,10004:1,10005:1 */ @Override protected void reduce(GoodsBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); //Item received: 20005:220002:220001:320007:120006:2 String[] goods = iter.next().toString().split(","); while (iter.hasNext()) { //Get the user's purchase vector: 10001:110004:110005:1 String[] users = iter.next().toString().split(","); // System.out.println(Arrays.toString(users)); for (String user : users) { String[] uid_nums = user.split(":"); for (String good : goods) { String[] gid_nums = good.split(":"); //sb output as key StringBuffer sb = new StringBuffer(); sb.append(uid_nums[0]).append(",").append(gid_nums[0]); context.write(new Text(sb.toString()), new DoubleWritable(Double.parseDouble(uid_nums[1]) * Double.parseDouble(gid_nums[1]))); sb.setLength(0); } } } } } }
Result data:
10001,20001 2 10001,20001 2 10001,20001 3 10001,20001 1 10001,20001 2 10001,20002 3 10001,20002 2 10001,20002 2 10001,20002 2 10001,20002 2 ... ...
Step 6: sum the recommended scattered results calculated in step 5.
Raw data: calculation results of step 5
package com.briup.mr.six; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:04 */ //Step 6: sum the recommended scattered results calculated in step 5. public class MakeSumForMultiplication { public static class MakeSumForMultiplicationMapper extends Mapper<Text, DoubleWritable, Text, DoubleWritable> { //MAP read in data: 1000620007 three @Override protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } public static class MakeSumForMultiplicationReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { double sum = 0; for (DoubleWritable value : values) { sum += value.get(); } context.write(key, new DoubleWritable(sum)); } } }
Result data:
10001,20001 10 10001,20002 11 10001,20003 1 10001,20004 2 10001,20005 9 10001,20006 10 ... ...
Step 7: de duplication of data, removing the product information purchased by the user from the recommendation results.
Data source:
1.FirstMapper processes the user's purchase list data.
2.SecondMapper processes the recommendation result data of the 6th.
javaBean class UserAndGoods:
package com.briup.mr.seven; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:24 */ public class UserAndGoods implements WritableComparable<UserAndGoods> { private String userId; private String goodsId; //A flag of 1 indicates that the data comes from the source data //A flag of 0 indicates that the data comes from the result of step 6 private int flag; public UserAndGoods() { } public UserAndGoods(String userId, String goodsId, int flag) { this.userId = userId; this.goodsId = goodsId; this.flag = flag; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getGoodsId() { return goodsId; } public void setGoodsId(String goodsId) { this.goodsId = goodsId; } @Override public int compareTo(UserAndGoods o) { int i = this.getUserId().compareTo(o.getUserId()); //When user i is different if (i != 0) { return i; } else return this.getGoodsId().compareTo(o.getGoodsId()); } @Override public String toString() { return "UserAndGoods{" + "userId='" + userId + '\'' + ", goodsId='" + goodsId + '\'' + ", flag=" + flag + '}'; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(userId); dataOutput.writeUTF(goodsId); dataOutput.writeInt(flag); } @Override public void readFields(DataInput dataInput) throws IOException { this.userId = dataInput.readUTF(); this.goodsId = dataInput.readUTF(); this.flag = dataInput.readInt(); } }
mapreduce class:
package com.briup.mr.seven; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Collections; import java.util.Iterator; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 0:19 */ //The data is de duplicated, and the product information purchased by the user is removed from the recommendation result. public class DuplicateDataForResult { //FirstMapper processes the user's purchase list data public static class DuplicateDataForResultFirstMapper extends Mapper<LongWritable, Text, UserAndGoods, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //A flag of 1 indicates that the data comes from the source data //A flag of 0 indicates that the data comes from the result of step 6 String[] line = value.toString().split("\t"); context.write(new UserAndGoods(line[0], line[1], 1), value); } } //The SecondMapper processes the recommendation result data of the 6th public static class DuplicateDataForResultSecondMapper extends Mapper<Text, DoubleWritable, UserAndGoods, Text> { @Override protected void map(Text key, DoubleWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split(","); context.write(new UserAndGoods(line[0], line[1], 0), new Text(key.toString() + "\t" + value.get())); } } /* reduce Expected output data: 10001 twenty thousand and four two */ public static class DuplicateDataForResultReducer extends Reducer<UserAndGoods, Text, Text, NullWritable> { int i = 0; @Override protected void reduce(UserAndGoods key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Iterator<Text> iter = values.iterator(); System.out.println((i++) + "--" + key); //The first element of the collection Text res = iter.next(); System.out.println(res.toString()); //If the collection has no next element, write it out directly if (!iter.hasNext()) { System.out.println("There is a next element"); context.write(res, NullWritable.get()); } } } }
Calculation results:
10001 20004 2 10001 20003 1 10002 20002 2 10002 20007 2 10002 20001 2 10002 20005 2 10003 20006 3 10003 20005 3 10003 20001 3 10003 20004 1 10004 20007 5 10004 20004 1 10004 20003 1 10005 20006 2 10005 20002 2 ... ...
step8: save recommendation results to MySQL database
Data source: calculation results in step 7
package com.briup.mr.eight; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/29 16:51 */ public class DataInDB { public static class DataInDBMapper extends Mapper<Text, NullWritable, RecommendResultBean, NullWritable> { @Override protected void map(Text key, NullWritable value, Context context) throws IOException, InterruptedException { String[] line = key.toString().split("\t"); RecommendResultBean outK = new RecommendResultBean(); outK.setNums(Double.parseDouble(line[1])); String[] split = line[0].split(","); outK.setUid(Integer.parseInt(split[0])); outK.setGid(Integer.parseInt(split[1])); context.write(outK, NullWritable.get()); } } public static class DataInDBReducer extends Reducer<RecommendResultBean, DoubleWritable, RecommendResultBean, NullWritable> { @Override protected void reduce(RecommendResultBean key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } } }
result:
[the external chain image transfer fails. The source station may have an anti-theft chain mechanism. It is recommended to save the image and upload it directly (img-boi4r1tr-1627699308225) (C: \ users \ ASUS \ appdata \ roaming \ typora \ typora user images \ image-20210729205026043. PNG)]
Step 9: build the job flow and submit the job to the cluster for operation
package com.briup.mr; import com.briup.mr.eight.DataInDB; import com.briup.mr.eight.RecommendResultBean; import com.briup.mr.five.GoodsBean; import com.briup.mr.five.GoodsGroup; import com.briup.mr.five.GoodsPartitioner; import com.briup.mr.five.MultiplyGoodsMatrixAndUserVector; import com.briup.mr.four.UserBuyGoodsVector; import com.briup.mr.one.UserBuyGoodsList; import com.briup.mr.seven.DuplicateDataForResult; import com.briup.mr.seven.UserAndGoods; import com.briup.mr.six.MakeSumForMultiplication; import com.briup.mr.three.GoodsCooccurrenceMatrix; import com.briup.mr.two.GoodsCooccurrenceList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * Created by Intellij IDEA. * * @author zhudezhong * @date 2021/7/27 16:36 */ public class FinalJob extends Configured implements Tool { @Override public int run(String[] strings) throws Exception { //The first step is to calculate the job task of the result Configuration conf = getConf(); // Path input = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ input.txt"); // Path one_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out11"); // Path two_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out22"); // Path three_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out33"); // Path four_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out44"); // Path five_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out55"); // Path six_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out66"); // Path seven_output = new Path("C:\Users\ASUS\Desktop\briup \ \ big data \ \ out77"); Path input = new Path("/user/zhudz/goods/input.txt"); Path one_output = new Path("/user/zhudz/goods/out11"); Path two_output = new Path("/user/zhudz/goods/out22"); Path three_output = new Path("/user/zhudz/goods/out33"); Path four_output = new Path("/user/zhudz/goods/out44"); Path five_output = new Path("/user/zhudz/goods/out55"); Path six_output = new Path("/user/zhudz/goods/out66"); Path seven_output = new Path("/user/zhudz/goods/out77"); FileSystem fs = FileSystem.get(conf); //Judge whether the output path exists, and delete it if it exists if (fs.exists(one_output)) { fs.delete(one_output, true); } if (fs.exists(two_output)) { fs.delete(two_output, true); } if (fs.exists(three_output)) { fs.delete(three_output, true); } if (fs.exists(four_output)) { fs.delete(four_output, true); } if (fs.exists(five_output)) { fs.delete(five_output, true); } if (fs.exists(six_output)) { fs.delete(six_output, true); } if (fs.exists(seven_output)) { fs.delete(seven_output, true); } //Step 1: calculate the list of goods purchased by the user Job job = Job.getInstance(conf); job.setJarByClass(this.getClass()); job.setJobName("Step1:Calculate the list of items purchased by the user"); job.setMapperClass(UserBuyGoodsList.UserBuyGoodsListMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(UserBuyGoodsList.UserBuyGoodsListReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(SequenceFileOutputFormat.class); TextInputFormat.addInputPath(job, input); SequenceFileOutputFormat.setOutputPath(job, one_output); //Step 2: calculate the job task of the result Job job1 = Job.getInstance(conf); job1.setJarByClass(this.getClass()); job1.setJobName("Step2:Calculate co-occurrence relationship of goods"); job1.setMapperClass(GoodsCooccurrenceList.GoodsCooccurrenceListMapper.class); job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(NullWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(NullWritable.class); job1.setInputFormatClass(SequenceFileInputFormat.class); job1.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job1, one_output); SequenceFileOutputFormat.setOutputPath(job1, two_output); //Step 3: calculate the job task of the result Job job2 = Job.getInstance(conf); job2.setJarByClass(this.getClass()); job2.setJobName("Step3:Number of CO cash commodities(Co-occurrence matrix)"); job2.setMapperClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixMapper.class); job2.setMapOutputKeyClass(Text.class); job2.setMapOutputValueClass(Text.class); job2.setReducerClass(GoodsCooccurrenceMatrix.GoodsCooccurrenceMatrixReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); job2.setInputFormatClass(SequenceFileInputFormat.class); job2.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job2, two_output); SequenceFileOutputFormat.setOutputPath(job2, three_output); //Step 4 calculate user purchase vector Job job3 = Job.getInstance(conf); job3.setJarByClass(this.getClass()); job3.setJobName("Step3:Calculate the user's purchase vector"); job3.setMapperClass(UserBuyGoodsVector.UserBuyGoodsVectorMapper.class); job3.setMapOutputKeyClass(Text.class); job3.setMapOutputValueClass(Text.class); job3.setReducerClass(UserBuyGoodsVector.UserBuyGoodsVectorReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); //Read source file data job3.setInputFormatClass(TextInputFormat.class); job3.setOutputFormatClass(SequenceFileOutputFormat.class); TextInputFormat.addInputPath(job3, input); SequenceFileOutputFormat.setOutputPath(job3, four_output); //Step 5 calculate the product co-occurrence matrix multiplied by the user purchase vector Job job4 = Job.getInstance(conf); job4.setJarByClass(this.getClass()); job4.setJobName("Step4:The commodity co-occurrence matrix is multiplied by the user purchase vector to form a temporary recommendation result"); // Build multiple different map tasks //Co occurrence times of goods mapper MultipleInputs.addInputPath(job4, three_output, SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorFirstMapper.class); //User purchases vector mapper MultipleInputs.addInputPath(job4, four_output, SequenceFileInputFormat.class, MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorSecondMapper.class); job4.setMapOutputKeyClass(GoodsBean.class); job4.setMapOutputValueClass(Text.class); job4.setPartitionerClass(GoodsPartitioner.class); job4.setGroupingComparatorClass(GoodsGroup.class); job4.setReducerClass(MultiplyGoodsMatrixAndUserVector.MultiplyGoodsMatrixAndUserVectorReducer.class); job4.setOutputKeyClass(Text.class); job4.setOutputValueClass(DoubleWritable.class); job4.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job4, five_output); //Step 6: sum the recommended scattered results calculated in step 5 Job job5 = Job.getInstance(conf); job5.setJarByClass(this.getClass()); job5.setJobName("Step6:Sum the recommended scattered results calculated in step 5"); job5.setMapperClass(MakeSumForMultiplication.MakeSumForMultiplicationMapper.class); job5.setMapOutputKeyClass(Text.class); job5.setMapOutputValueClass(DoubleWritable.class); job5.setReducerClass(MakeSumForMultiplication.MakeSumForMultiplicationReducer.class); job5.setOutputKeyClass(Text.class); job5.setOutputValueClass(DoubleWritable.class); job5.setInputFormatClass(SequenceFileInputFormat.class); job5.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileInputFormat.addInputPath(job5, five_output); SequenceFileOutputFormat.setOutputPath(job5, six_output); //Step 7 Job job6 = Job.getInstance(conf); job6.setJarByClass(this.getClass()); job6.setJobName("Step7:The data is de duplicated, and the product information purchased by the user is removed from the recommendation result"); // Build multiple different map tasks //FirstMapper processes the user's purchase list data. MultipleInputs.addInputPath(job6, input, TextInputFormat.class, DuplicateDataForResult.DuplicateDataForResultFirstMapper.class); //SecondMapper processes the recommendation result data of the 6th. MultipleInputs.addInputPath(job6, six_output, SequenceFileInputFormat.class, DuplicateDataForResult.DuplicateDataForResultSecondMapper.class); job6.setMapOutputKeyClass(UserAndGoods.class); job6.setMapOutputValueClass(Text.class); //Set grouping // job6.setGroupingComparatorClass(DuplicateDataGroup.class); job6.setReducerClass(DuplicateDataForResult.DuplicateDataForResultReducer.class); job6.setOutputKeyClass(Text.class); job6.setOutputValueClass(NullWritable.class); job6.setOutputFormatClass(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setOutputPath(job6, seven_output); //Step 8: save the recommendation results to the MySQL database. The data comes from step 7 Job job7 = Job.getInstance(conf); job7.setJarByClass(this.getClass()); job7.setJobName("Step8:Save recommendation results to MySQL In database"); DBConfiguration.configureDB(job7.getConfiguration(), "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.10.131/briup", "root", "root"); DBOutputFormat.setOutput(job7, "recommend", "uid", "gid", "nums"); job7.setMapperClass(DataInDB.DataInDBMapper.class); job7.setMapOutputKeyClass(RecommendResultBean.class); job7.setMapOutputValueClass(NullWritable.class); job7.setReducerClass(DataInDB.DataInDBReducer.class); job7.setOutputKeyClass(RecommendResultBean.class); job7.setOutputValueClass(NullWritable.class); job7.setInputFormatClass(SequenceFileInputFormat.class); SequenceFileInputFormat.addInputPath(job7, seven_output); job7.setOutputFormatClass(DBOutputFormat.class); //final: build job flow ControlledJob contro_job = new ControlledJob(conf); contro_job.setJob(job); ControlledJob contro_job1 = new ControlledJob(conf); contro_job1.setJob(job1); contro_job1.addDependingJob(contro_job); ControlledJob contro_job2 = new ControlledJob(conf); contro_job2.setJob(job2); contro_job2.addDependingJob(contro_job1); ControlledJob contro_job3 = new ControlledJob(conf); contro_job3.setJob(job3); ControlledJob contro_job4 = new ControlledJob(conf); contro_job4.setJob(job4); contro_job4.addDependingJob(contro_job2); contro_job4.addDependingJob(contro_job3); ControlledJob contro_job5 = new ControlledJob(conf); contro_job5.setJob(job5); contro_job5.addDependingJob(contro_job4); ControlledJob contro_job6 = new ControlledJob(conf); contro_job6.setJob(job6); contro_job6.addDependingJob(contro_job5); ControlledJob contro_job7 = new ControlledJob(conf); contro_job7.setJob(job7); contro_job7.addDependingJob(contro_job6); JobControl jobs = new JobControl("goods_recommends"); jobs.addJob(contro_job); jobs.addJob(contro_job1); jobs.addJob(contro_job2); jobs.addJob(contro_job3); jobs.addJob(contro_job4); jobs.addJob(contro_job5); jobs.addJob(contro_job6); jobs.addJob(contro_job6); jobs.addJob(contro_job7); Thread t = new Thread(jobs); t.start(); //Print log while (true) { for (ControlledJob c : jobs.getRunningJobList()) { c.getJob().monitorAndPrintJob(); } if (jobs.allFinished()) break; } return 0; } public static void main(String[] args) throws Exception { System.exit(new ToolRunner().run(new FinalJob(), args)); // new ToolRunner().run(new FinalJob(), args); } }
Step 9: write the shell task script on the cluster and set the scheduled execution of the task
1. Write a script to migrate the data in mysql to the hdfs file system
sudo vi mysqlToHDFS.sh #Add execution permission when finished sudo chmod +x mysqlToHDFS.sh
#!bin/bash sqoop import --connect jdbc:mysql://localhost:3306/briup --username root --password root --delete-target-dir --target-dir /user/hdfs/recommend --query 'select s.user_id,d.product_id,d.num from s_order s,order_line d where s.id=d.order_id and $CONDITIONS' --m 1
2. Write a script to submit the task of the change project to the cluster
sudo vi recomendMR.sh #Add execution permission when finished sudo chmod +x recomendMR.sh
#!bin/bash yarn jar /home/hdfs/GoodsRecommend-1.0-SNAPSHOT.jar com.briup.mr.FinalJob
3. Set scheduled tasks
crontab -e #Execute the task of migrating data from mysql to hdfs file system at 7 a.m. every Monday 0 7 * * 2 sh ~/bin/mysqlToHDFS.sh #At 7:30 a.m. every Monday, execute the task of submitting the change project to the cluster 30 7 * * 2 sh ~/bin/recomendMR.sh
Project source code: https://gitee.com/zhu-dezhong/recomendGoods