这篇文章是在完成MIT 6.824 第一周课程之后写的笔记以及Lab1的具体实现过程,有什么错误请各位大佬指出,
代码提交在GitHub上
1. MapReduce论文简读
1.1 编程模型
MapReduce编程模型的原理是:用户首先指定一个Map函数处理一个基于key/value 对的数据集合,输出基于key/value 对的中间数据集合;然后再使用一个reduce函数用来合并上一步中间数据集合具有相同key值的中间value值。采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源,因为他们只用编写Map和Reduce函数的逻辑,而不用关系底层分布式系统是如何实现的。MapReduce有很多例子,比如:分布式的Grep、倒转网络链接图、每个主机的检索词向量、倒排索引、分布式排序等等。
1.2 MapReduce的实现
上图详细了描述了MapReduce详细的流程图,下面序号和图中一 一对应:
- 用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
- 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker,为了节省网络带宽,master都会将map任务分配到有数据的本地的机器上。
- 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pAIr,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。
- 缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
- 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
- Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
- 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。
在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
1.3 Master的作用
Master持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。
Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。
1.4 容错
- Map Worker出错:由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。
- 而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。
- master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算,或者可以周期性的保存Checkpoint,然后可以从最后一个检查点(checkpoint)开始启动另一个master进程。
2. MIT 6.824 Lab1 2022 Spring 具体实现
2.1 定义worker和Coordinator沟通的数据类型
下面枚举值代表了Coordinator和worker之间的相互通知类型,比如Coordinator给worker发了一个TaskMapDistribute(分配一个Map任务),然后worker看到是给我分配一个Map任务,那么此时woker就应该执行map操作,再比如,worker给Coordinator发了一个TaskRequest(请求一个任务),此时Coordinator就应该决定该给worker分配一个什么任务呢(是Map还是Reduce任务呢)。
// OperateType 定义一个Coordinator和worker之间通信的枚举值
type OperateType int
const (
TaskRequest OperateType = iota //请求一个任务
TaskMapDistribute //分配一个Map任务
TaskReduceDistribute //分配一个Reduce任务
TaskMapDone //一个Map任务完成了
TaskReduceDone //一个Reduce任务完成了
TaskWait //让worker等待任务完成
TaskAllDone //所有Map和Reduce任务都完成了
)
下面定义了worker给Coordinator发信息格式:
Id: 代表了此次请求的唯一标识符,如果请求和回复的标识符不一样,代表不是一个任务了
TaskId:代表了任务唯一id编号
OpType:表示通知Coordinator执行对应操作
type CommonArgs struct {
Id int64 //当前请求的唯一标识符
TaskID int //分配的任务id
OpType OperateType //通知类型
}
下面定义了Coordinator给worker发信息格式:
Id: 代表了此次回复的唯一标识符,如果回复和请求的标识符不一样,代表不是一个任务了,就直接忽略
TaskID: 代表了任务唯一id编号
OpType: 表示通知Coordinator执行对应操作
FileName:表示回复一个文件名称
NReduce: reduce任务的总数,用于让map知道保存到几个临时文件中, NMap: map任务的总数
type CommonReply struct {
Id int64 //当前请求的唯一标识符
TaskID int //分配的任务id
OpType OperateType //通知类型
FileName string //回复一个文件名称
NReduce int //reduce任务的总数,用于让map知道保存到几个临时文件中
NMap int //map任务的总数
}
2.2 Coordinator的定义
下面定义了Coordinator的数据类型:
type Coordinator struct {
mapQueue chan int //map任务的TaskId编号队列
reduceQueue chan int //reduce任务的TaskId编号队列
mapTaskRunning []int64 //表示对应TaskId编号的map任务运行状态
reduceTaskRunning []int64 //表示对应TaskId编号的reduce任务运行状态
files []string //表示任务,其实就是写文件名称,每次分配一个TaskId时,就从对饮TaskId位置取一个文件名称给Worker
nReduce int //表示reduce任务总数,也就是每次map完成后,保存几个临时文件,如果为10,就保存10个临时文件
nMap int //表示map任务总数
mapNotFinishTaskCnt int //表示未完成的map任务,包括正在运行和为分配的任务
reduceNotFinishCnt int //表示未完成的reduce任务,包括正在运行和为分配的任务
mu sync.RWMutex //读写锁,防止并发竞争
allTaskDone bool //任务是否全部完成
}
接下来就是定义Coordinator如何处理对应worker请求了:
下面注释很全了
/Req :args:代表请求参数
//Req :reply代表回复参数
func (c *Coordinator) Req(args *CommonArgs, reply *CommonReply) error {
//所有任务都完成的话,通知所有worker并退出当前Coordinator
if c.allTaskDone {
reply.TaskID = args.TaskID
reply.OpType = TaskAllDone
return nil
}
//处理worker发来的消息类型枚举值
switch args.OpType {
case TaskRequest:
reply.Id = args.Id
reply.NMap = c.nMap
reply.NReduce = c.nReduce
//如果还有未分配的map任务
if len(c.mapQueue) > 0 {
reply.TaskID = <-c.mapQueue
reply.OpType = TaskMapDistribute
reply.FileName = c.files[reply.TaskID]
atomic.StoreInt64(&(c.mapTaskRunning[reply.TaskID]), args.Id)
// 用于在10s后检测worker是否完成,如果未完成,就把任务id重新放入队列,以便让其他人完成
go func(taskId int) {
time.Sleep(10 * time.Second)
//mapTaskRunning保存了当前任务的状态,当worker完成了会把当前taskId位置的值设置未1,如果没有设置,代表任务没完成
if c.mapTaskRunning[taskId] != 1 {
c.mapQueue <- taskId
} else {
c.mu.Lock()
//把任务id重新放入队列,以便让其他人完成
c.mapNotFinishTaskCnt--
c.mu.Unlock()
}
}(reply.TaskID)
} else {
//没有未分配的map任务,但是有正在运行的任务taskId,则回复TaskWait,让worker等待,通常让worker进行time.sleep()操作
if c.mapNotFinishTaskCnt > 0 {
reply.OpType = TaskWait
} else {
// 如果map所有任务都完成了,并且reduce有未分配的任务,则给worker分配一个任务
if len(c.reduceQueue) > 0 {
reply.TaskID = <-c.reduceQueue
reply.OpType = TaskReduceDistribute
atomic.StoreInt64(&c.reduceTaskRunning[reply.TaskID], args.Id)
// 用于在10s后检测worker是否完成,如果未完成,就把任务id重新放入队列,以便让其他人完成
go func(taskId int) {
time.Sleep(10 * time.Second)
//reduceTaskRunning保存了当前任务的状态,当worker完成了会把当前taskId位置的值设置未1,如果没有设置,代表任务没完成
if c.reduceTaskRunning[taskId] != 1 {
//把任务id重新放入队列,以便让其他人完成
c.reduceQueue <- taskId
} else {
c.mu.Lock()
c.reduceNotFinishCnt--
c.mu.Unlock()
if c.reduceNotFinishCnt == 0 {
c.mu.Lock()
c.allTaskDone = true
c.mu.Unlock()
}
}
}(reply.TaskID)
} else {
//reduce没有待分配的任务,但是reduce还有正在运行的任务,则通知worker等待
if c.reduceNotFinishCnt > 0 {
reply.OpType = TaskWait
} else {
c.mu.Lock()
c.allTaskDone = true
c.mu.Unlock()
}
}
}
}
case TaskMapDone:
//Map任务完成了,则把mapTaskRunning对应taskId位置的值设置未1
if args.Id == c.mapTaskRunning[args.TaskID] {
atomic.StoreInt64(&c.mapTaskRunning[args.TaskID], 1)
}
case TaskReduceDone:
//Reduce任务完成了,则把reduceTaskRunning对应taskId位置的值设置未1
if args.Id == c.reduceTaskRunning[args.TaskID] {
atomic.StoreInt64(&(c.reduceTaskRunning[args.TaskID]), 1)
}
default:
return nil
}
return nil
}
下面是用于告诉 MIT官方给的检测器 何时停止的函数,如果不写的话,他会一直运行:
func (c *Coordinator) Done() bool {
ret := false
c.mu.Lock()
ret = c.allTaskDone
c.mu.Unlock()
return ret
}
接下来就是告诉 MIT官方给的检测器如果产生我的Coordinator
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here.
fileLength := len(files)
c.mapQueue = make(chan int, fileLength)
c.reduceQueue = make(chan int, nReduce)
c.mapTaskRunning = make([]int64, fileLength)
c.reduceTaskRunning = make([]int64, nReduce)
c.files = files
c.nReduce = nReduce
c.nMap = fileLength
c.mapNotFinishTaskCnt = fileLength
c.reduceNotFinishCnt = nReduce
for i := 0; i < fileLength; i++ {
c.mapQueue <- i
}
for i := 0; i < nReduce; i++ {
c.reduceQueue <- i
}
c.server()
return &c
}
2.3 Worker定义
Worker的定义完全可以参考官方给的mr/mrsequential.go文件,只需要稍微改下。
map任务步骤大致可以如下描述:
- 打开文件Coordinator返回回来的文件
- 读取文件中所有内容
- 调用用户自定义个map函数,它会返回官方定义的KeyValue类型的数据
- 根据nReduce创建文件,比如nReduce为10,就创建10个文件,用于循环保存hash值相同的文件,ihash(kv.Key) % nReduce
- 通知Coordinator已经完成一个Map任务
首先定义我们的map任务,map任务定义代码如下:
func MapTask(reply *CommonReply, mapf func(string, string) []KeyValue, timestamp int64) bool {
// 打开文件
fileName := reply.FileName
file, err := os.Open(fileName)
defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Fatalf(&#34;close file %s&#34;, fileName)
}
}(file)
if err != nil {
log.Fatalf(&#34;can&#39;t open %s&#34;, fileName)
}
// 读取文件中所有内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf(&#34;can&#39;t read %s&#34;, fileName)
}
//调用用户自定义个map函数
intermediate := mapf(fileName, string(content))
//根据nReduce创建文件
ofile := make([]*os.File, reply.NReduce)
for i := 0; i < reply.NReduce; i++ {
ofname := &#34;mr-&#34; + strconv.Itoa(reply.TaskID) + &#34;-&#34; + strconv.Itoa(i)
ofile, _ = os.Create(ofname)
defer func(file *os.File) {
err := file.Close()
if err != nil {
log.Fatalf(&#34;file.Close() error!&#34;)
}
}(ofile)
}
//把hash值相同的key写入对应文件中
for _, kv := range intermediate {
reduceId := ihash(kv.Key) % reply.NReduce
enc := json.NewEncoder(ofile[reduceId])
err := enc.Encode(&kv)
if err != nil {
log.Fatalf(&#34;can not read %v&#34;, ofile[reduceId])
}
}
//告诉Coordinator已经完成一个Map任务
args := CommonArgs{}
args.Id = timestamp
args.OpType = TaskMapDone
args.TaskID = reply.TaskID
replyArgs := CommonReply{}
return call(&#34;Coordinator.Req&#34;, &args, &replyArgs)
}
官方定义的KeyValue数据类型、hash函数,以及利用KeyValue进行排序的代码如下,上面代码产生的中间值类型就是官方定义的KeyValue。
type KeyValue struct {
Key string
Value string
}
type ByKey []KeyValue
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a, a[j] = a[j], a }
func (a ByKey) Less(i, j int) bool { return a.Key < a[j].Key }
func ihash(key string) int {
h := fnv.New32a()
_, err := h.Write([]byte(key))
if err != nil {
log.Fatalf(&#34;h.Write error!&#34;)
}
return int(h.Sum32() & 0x7fffffff)
}
接下就是Reduce任务:
步骤大致描述如下;
- 把map产生的中间文件按顺序取出,并将1其保存到keyValue数组中
- 根据key值进行排序操作
- 创建一个输出文件用于保存数据
- 将排序后的数据写入文件,参考mr/mrsequential.go文件
- 通知Coordinator,已经完成一个Reduce任务
具体代码如下:
func reduceTask(reply *CommonReply, reducef func(string, []string) string, timestamp int64) bool {
// 把map产生的中间文件按顺序取出,并将1其保存到keyValue数组中
var kva []KeyValue
for i := 0; i < reply.NMap; i++ {
ifilename := &#34;mr-&#34; + strconv.Itoa(i) + &#34;-&#34; + strconv.Itoa(reply.TaskID)
ifile, err := os.Open(ifilename)
defer func(ifile *os.File) {
err := ifile.Close()
if err != nil {
}
}(ifile)
// open file error
if err != nil {
log.Fatalf(&#34;Open File Error %s.&#34;, ifilename)
}
// read all intermediate data from the file
dec := json.NewDecoder(ifile)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
}
//根据key值进行排序操作
sort.Sort(ByKey(kva))
// 创建一个输出文件用于保存数据
ofilename := &#34;mr-out-&#34; + strconv.Itoa(reply.TaskID)
ofile, err := os.Create(ofilename)
if err != nil {
log.Fatalf(&#34;Creat Open File Error.&#34;)
}
defer func(ofile *os.File) {
err := ofile.Close()
if err != nil {
}
}(ofile)
//将排序后的数据写入文件,参考mr/mrsequential.go文件
i := 0
for i < len(kva) {
j := i + 1
for j < len(kva) && kva[j].Key == kva.Key {
j++
}
var values []string
for k := i; k < j; k++ {
values = append(values, kva[k].Value)
}
output := reducef(kva.Key, values)
// this is the correct format for each line of Reduce output.
_, err := fmt.Fprintf(ofile, &#34;%v %v\n&#34;, kva.Key, output)
if err != nil {
return false
}
i = j
}
//通知Coordinator,已经完成一个Reduce任务
args := CommonArgs{}
args.Id = timestamp
args.OpType = TaskReduceDone
args.TaskID = reply.TaskID
replyArgs := CommonReply{}
return call(&#34;Coordinator.Req&#34;, &args, &replyArgs)
}
接下来就是最后一步,完成woker函数
woker就是在一个死循环中不断向Coordinator请求一个任务,根据Coordinator返回的OpType执行对应操作即可。
具体代码如下:
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// 不断请求任务,然后完成任务
for {
//请求一个任务
timestamp := time.Now().Unix()
args := CommonArgs{}
args.Id = timestamp
args.OpType = TaskRequest
reply := CommonReply{}
ok := call(&#34;Coordinator.Req&#34;, &args, &reply)
//检测是否是同一个请求
if reply.Id != timestamp {
return
}
if ok {
switch reply.OpType {
//如果Coordinator分配一了一map任务,则执行map函数即可
case TaskMapDistribute:
MapTask(&reply, mapf, timestamp)
//如果Coordinator分配一了一Reduce任务,则执行Reduce函数即可
case TaskReduceDistribute:
reduceTask(&reply, reducef, timestamp)
// 告诉worker等待
case TaskWait:
//worker等待1s
time.Sleep(time.Second)
case TaskAllDone:
return
default:
return
}
} else {
log.Fatalf(&#34;Coordinator Server has been closed.&#34;)
return
}
time.Sleep(time.Second)
}
}
2.3 运行MIT的检测程序(必须在linux或者macos等类unix环境下,我是下了一个虚拟机,并且要求go为1.13):
上一篇:河北金融学院为什么不叫大学,真的那么差吗? 下一篇:美国排名前30开设金融学和金融工程的学校都有哪些?各自 ... |