摘要
网络编程实验课程要求必须写一个套接字的应用程序,考虑到之前写过的单词发音抓取程序的效率比较低下,就顺便结合套接字做一个分布式的抓取软件。其中涉及到动态任务领取,负载均衡,多线程,加锁解锁,简单的HTML代码解析,文件读写等功能。程序还是使用Python完成,对于学习Python、套接字编程、分布式编程甚至集群编程都有一定的意义。
另外,此软件具有一定的攻击性,如果启动的从节点数量过多,并且每个节点上启动的线程数量过大,那么提供单词发音的服务器可能承受不住压力。所以,此软件对于编写web服务器的负载压力测试有一定的参考意义。
下面是系统结构图:
图1 系统结构图
图1为系统结构图,整个系统由主节点和从节点两大部分构成,主节点上有四种不同类型的线程,分别为:
l 任务侦听线程:负责接受从节点上的任务请求线程,需要注意的是,任务侦听线程并不负责和请求线程通信,而是启动一个新的单词分发线程负责和请求线程通信。
l 单词分发线程池:为从节点上的请求线程服务的主节点上的线程的集合。
l 文件侦听线程:负责接受从节点上的数据回传请求,需要注意的是,文件侦听线程并不负责和请求线程通信,而是启动一个新的写磁盘线程和请求线程通信。
l 写磁盘线程:为从节点上的请求线程服务的主节点上的线程的集合。
客户端也是使用并发的方式实现,一共有以下三类线程:
l 请求线程:当单词队列小于一定的阈值时,请求线程负责向主节点的任务侦听线程请求任务。
l 下载线程池,负责现在单词音频文件的线程的集合
l 文件回传线程:负责将写入本地磁盘上的文件回传到主节点上的线程
另外,系统中还有几个关键的数据结构:
l 主节点上的全部单词列表,其中存储了所有的需要下载发音文件对应的单词,该数据结构是一个队列。
l 从节点上的部分单词列表,其中存储了从节点上需要下载的部分单词,该数据结构是一个队列。
l 从节点上的已下载的mp3文件名称列表,其中存储了已经下载的mp3文件的路径,该数据结构为一个队列。
下面为具体的编码实现,仅为核心代码没有贴所有代码。如果有需要可以给我发邮件索取~
任务侦听线程
- class WordDispatchThread(threading.Thread):
-
- def __init__(self,wordsList):
- threading.Thread.__init__(self)
- self.wordSocket = socket(AF_INET,SOCK_STREAM)
- self.wordSocket.bind(('',wordSocketPort))
- self.wordSocket.listen(maxConnectNum)
- self.wordsList = wordsList
-
- def run(self):
- while True:
- clientSock,address = self.wordSocket.accept()
- print 'connect from %s is established.' % str(address)
- wordWorker = WordServiceThread(clientSock,self.wordsList)
- wordWorker.start()
说明:在该类的构造函数中,将创建侦听套接字,并将全部单词列表作为类的参数成员。线程运行为一个死循环,其中启动accept方法,收到任务请求线程的请求之后,将启动任务分发线程为其服务。
该线程的状态转换图如图2所示:
图2 任务侦听线程状态转换图
文件侦听线程
- class FileStoreThread(threading.Thread):
-
- def __init__(self,wordsDownloaded):
- threading.Thread.__init__(self)
- self.fileSocket = socket(AF_INET,SOCK_STREAM)
- self.fileSocket.bind(('',fileSocketPort))
- self.fileSocket.listen(maxConnectNum)
- self.wordsList = wordsDownloaded
-
- def run(self):
- while True:
- clientSock,address = self.fileSocket.accept()
- print 'connect from %s is established.' % str(address)
- fileWorker = FileServiceThread(clientSock,self.wordsList)
- fileWorker.start()
说明:在该类的构造函数中,将创建侦听套接字,并将已下载的单词列表作为类的参数成员。线程运行为一个死循环,其中启动 accept 方法,收到任务请求线程的请求之后,将启动写磁盘线程为其服务。 图3 文件侦听线程的状态转换图
任务分发线程
- class WordServiceThread(threading.Thread):
-
- def __init__(self,clientSocket,wordsList):
- threading.Thread.__init__(self)
- self.clientSocket = clientSocket
- self.wordsList = wordsList
-
- def run(self):
- global wordsListMutex
- while True:
- request = self.clientSocket.recv(maxWordLength)
- requestStr = request.decode('ascii')
- if(requestStr == 'please'):
- tmpWordsList = ''
- wordsListMutex.acquire()
- for i in range(0,taskGrain):
- tmpWord = self.wordsList.front()
- tmpWordsList = tmpWordsList + ' ' + tmpWord
- wordsListMutex.release()
- self.clientSocket.send(tmpWordsList)
- else:
- self.clientSocket.close()
- break
说明:在该类的构造函数中得到为请求线程服务的套接字,以及全部单词列表的队列。在run中执行任务发送协议。首先受到一个请求字符串,应该为‘please’,否则表示应该在服务器端关闭为其服务的套接字。
收到please之后,从所有单词队列中取出一定数量的单词拼成一个字符串发给送从节点的请求线程。
该线程的执行逻辑对应的流程图如图4所示:
图4 任务分发线程
写磁盘线程
- class FileServiceThread(threading.Thread):
- global soundQueueMutex
- def __init__(self,clientSocket,wordsDownloaded):
- threading.Thread.__init__(self)
- self.socket = clientSocket
- self.wordsDownloaded = wordsDownloaded
-
- def run(self):
- while True:
- rawSaveName = self.socket.recv(maxWordLength)
- word = rawSaveName.decode('ascii')
- if(word == ''):
- continue
- if(word == '0'):
- break
- saveName = word + '.mp3'
- outfd = open(saveName,'wb')
- helloPacket = 'OK'
- helloPacketStr = helloPacket.encode('ascii')
- self.socket.send(helloPacketStr)
- while True:
- data = self.socket.recv(dataBlockSize)
- outfd.write(data)
- query = 'any more?'
- self.socket.send(query.encode('ascii'))
- answer = self.socket.recv(maxWordLength)
- answer = answer.decode('ascii')
- if(answer == 'yes'):
- continue
- else:
- break
- print 'received %s successfully...' % saveName
- outfd.close()
说明:在构造函数中获取到为客户端文件回写线程服务的套接字,通过该套接字执行数据回写协议,将数据写回到磁盘上。 图5 写磁盘线程
任务请求线程
- class TaskRequestThread(threading.Thread):
- global wordQueueMutex
-
- def __init__(self,wordQueue):
- threading.Thread.__init__(self)
- self.requestSocket = socket(AF_INET,SOCK_STREAM)
- self.taskQueue = wordQueue
- self.requestSocket.connect(('localhost',wordSocketPort))
-
- def run(self):
- while True:
- wordQueueMutex.acquire()
- if(len(self.taskQueue) < downloadWokerNum ):
- requestStr = 'please'
- self.requestSocket.send(requestStr.encode('ascii'))
- rawStr = self.requestSocket.recv(rawStrLen) #group 10 words once
- wordsStr = rawStr.decode('ascii')
- wordsList = wordsStr.split()
- for word in wordsList:
- self.taskQueue.append(word)
- exit = True
- for word in self.taskQueue:
- if word != '0':
- exit = False
- if(exit == True):
- self.requestSocket.send('0')
- self.requestSocket.close()
- wordQueueMutex.release()
- break
- wordQueueMutex.release()
- time.sleep(taskRequestWorkerSleep)
说明:请求线程定期扫描任务队列中任务的数量,如果小于一定的阈值,则从主节点上请求新的单词列表任务。否则,线程进入睡眠。如果当前队列中的所有单词都为‘ 0 ’,说明主节点上已经没有单词任务了,这时可以退出了。 图6 任务请求线程
下载线程
- class DownloadThread(threading.Thread):
- global wordQueueMutex
- global fileQueueMutex
-
- def __init__(self,wordQueue,fileQueue):
- threading.Thread.__init__(self)
- self.wordQueue = wordQueue
- self.fileQueue = fileQueue
-
- def run(self):
- while True:
- time.sleep(2) #delete this when presenting a demostration
- word = ''
- wordQueueMutex.acquire()
- while (len(self.wordQueue) == 0):
- wordQueueMutex.release()
- time.sleep(downloadWorkerSleep)
- wordQueueMutex.acquire()
- if(self.wordQueue[0]=='0'):
- wordQueueMutex.release()
- fileQueueMutex.acquire()
- self.fileQueue.append('0')
- fileQueueMutex.release()
- break;
- else:
- word = self.wordQueue.pop(0)
- wordQueueMutex.release()
- url = "http://www.dwds.de/?qu="+word
- urlContent = urllib2.urlopen(url).read()
- #print urlContent
- urlList = re.findall('filename=http://media.dwds.de/dwds/media/sound/dwdswb_aussprache/.*\.mp3', urlContent)
- try:
- finalUrl = urlList[0][9:]
- #print finalUrl
- soundData = urllib2.urlopen(finalUrl).read()
- saveName=word+'.mp3'+'.local'
- #print saveName
- outfd = open(saveName,'wb')
- outfd.write(soundData)
- outfd.close()
- fileQueueMutex.acquire()
- self.fileQueue.append(word)
- fileQueueMutex.release()
- print '%s: OK' % word
- except:
- print '%s: FAILED' % word
- finally:
- pass
说明:下载线程在构造函数中就获得了任务列表队列和已下载文件队列,其处理过程前面已经叙述过了。首先从当前任务队列中取出一个单词,如果单词为’0’,表示已经没有任务了,这时线程退出。否则,就需要去构建单词页面的URL,然后分析页面的HTML代码,使用正则表达式找到单词音频文件的URL,接着将数据读入内存并写入磁盘。另外注意的是,下载线程在退出的时候会给已下载单词队列中写入’0’,以通知回传线程退出。
下载线程的执行逻辑对应的流程图如图7所示:
图7 下载线程
文件回传线程
- class FileTransferThread(threading.Thread):
- global fileQueueMutex
-
- def __init__(self,fileQueue):
- threading.Thread.__init__(self)
- self.fileQueue = fileQueue;
- self.fileSocket = socket(AF_INET,SOCK_STREAM)
- self.fileSocket.connect(('localhost',fileSocketPort))
- self.exitCounter = 0
-
- def run(self):
- while True:
- fileQueueMutex.acquire()
- while(len(self.fileQueue)==0):
- fileQueueMutex.release()
- time.sleep(fileWorkerSleep)
- fileQueueMutex.acquire()
- word = self.fileQueue.pop(0)
- fileQueueMutex.release()
- if(word == '0'):
- self.exitCounter = self.exitCounter + 1
- if (self.exitCounter == downloadWokerNum):
- self.fileSocket.send(word.encode('ascii'))
- self.fileSocket.close()
- break
- else:
- continue
- self.fileSocket.send(word.encode('ascii'))
- response = self.fileSocket.recv(helloLength)
- responseStr = response.decode('ascii')
- if(responseStr != 'OK'):
- self.fileSocket.close()
- continue
- saveName = word + '.mp3' + '.local'
- infd = open(saveName,'rb')
- data = infd.read(dataBlockSize)
- while True:
- self.fileSocket.send(data)
- query = self.fileSocket.recv(maxWordLength)
- answer = ''
- data = infd.read(dataBlockSize)
- if not data:
- answer = 'no'
- self.fileSocket.send(answer.encode('ascii'))
- break
- else:
- answer = 'yes'
- self.fileSocket.send(answer.encode('ascii'))
- infd.close()
说明:回传线程在构造函数中获得了已下载的单词的队列。在运行的过程中,首先判断当前的已下载单词队列中是否有文件名,如果有则立即回传数据。下载线程在退出的时候会给已下载单词队列中写入’0’
,以通知回传线程退出。回传线程会统计获得的0
的数量,如果统计的数量等于下载线程的数量,表示下载线程全部退出,同时文件已经回传完毕。这时,回传线程也可以退出了。 图8 文件回传线程
单词列表类
- class WordsList:
-
- def __init__(self):
- if(len(sys.argv)<2):
- print 'Usage: %s filename' % sys.argv[0]
- sys.exit(-1)
- filePath = sys.argv[1]
- self.t = []
- try:
- for line in fileinput.input(filePath):
- wordLen = len(line)
- if( wordLen > 1 and line[wordLen-1] == '\n'):
- word = line[0:wordLen-1]
- self.t.append(word)
- else:
- self.t.append(line)
- self.t.append('0')
- except:
- print 'constructing words list error.'
- print 'maybe the provided file path is wrong.Check it twice.'
- sys.exit(-2)
- finally:
- pass
- print 'constructing words successfully...'
-
- def front(self):
- if(self.t[0]!='0'):
- return self.t.pop(0)
- else:
- return self.t[0]
说明:该类在构造函数中,从命令行上提供的文件名中解析出所有的单词,并存放在一个列表中,在最后加入一个 ’0’ 表示已经没有单词了。此类中还提供一个方法 --front ,用于从单词队列中获取队首的单词,如果没有单词的话,返回为 ’0’ 。这一点非常重要,用于控制从节点线程的结束。 总结
今年一月份的时候,准备背诵德语四级单词,但是很多单词发音记不清楚,所以找到了一个单词发音网站。当时,想将这些音频文件存放在我的mp3播放器中,这样我可以随时随地的背诵记忆了,所以用Python写了一个简单的单词发音抓取程序,但是抓取效率不是很高,速度比较慢。而我的研究方向为高性能计算,现在主要是集群计算,所以结合网络课程上要求的套接字网络编程实验和我的研究方向,完成了此软件的开发。
该软件的目标很明确,就是以服务器能否提供的最大负载来下载单词对应的音频文件。此软件有一定的攻击性,如果这个软件可以通过因特网传播,并在后台隐秘的运行,则完全可以使提供单词下载的网站瘫痪。
在实际编程中,遇到了一些比较棘手的问题。特别是文件传输过程,虽然可以使用ftp协议传输,但是为了保持软件的最小依赖性,还是实现了一个简单的文件传输协议。因为Karn算法的原因,有些字节被缓冲在缓存中导致接受过程混乱等问题都是之前没有遇到过的。这些问题通过自定义的一些简单的协议得以解决。
之前还想添加一个语音提示的功能,在某个单词下载完毕之后,语音提示该单词已经下载完成。但考虑到多节点下载时单词发音反而会成为瓶颈,所以放弃了这个功能的实现。
此软件仅仅作为一个演示软件,并没有在实际的集群上运行过。软件的实际运行环境还是做了很多的假设,并不能保证深度测试的时候不出现故障。
关于软件的并发,由于希望此软件能够运行在集群上,所以就必须考虑负载均衡的问题。这里使用主从模式,服务器作为主节点,其他节点作为从节点。为了做到负载均衡,使用动态任务分配的并行模式,每个从节点在没有任务的时候向主节点索取,而不是被动的从主节点接受。动态任务分配的模式可以保证各个节点都处于繁忙状态,最小化负载不均带来的问题。
本文转自hipercomer 51CTO博客,原文链接:http://blog.51cto.com/hipercomer/874996