python3+PyQt5实现支持多线程的页面索引器应用程序

脚本专栏 发布日期:2025/1/22 浏览次数:1

正在浏览:python3+PyQt5实现支持多线程的页面索引器应用程序

本文通过Python3+pyqt5实现了python Qt GUI 快速编程的19章的页面索引器应用程序例子。

/home/yrd/eric_workspace/chap19/walker_ans.py

#!/usr/bin/env python3

import codecs
import html.entities
import re
import sys
from PyQt5.QtCore import (QMutex, QThread,pyqtSignal,Qt)

class Walker(QThread):
 finished = pyqtSignal(bool,int)
 indexed = pyqtSignal(str,int)
 COMMON_WORDS_THRESHOLD = 250
 MIN_WORD_LEN = 3
 MAX_WORD_LEN = 25
 INVALID_FIRST_OR_LAST = frozenset("0123456789_")
 STRIPHTML_RE = re.compile(r"<[^>]*?>", re.IGNORECASE|re.MULTILINE)
 ENTITY_RE = re.compile(r"&(\w+")
 SPLIT_RE = re.compile(r"\W+", re.IGNORECASE|re.MULTILINE)

 def __init__(self, index, lock, files, filenamesForWords,
     commonWords, parent=None):
  super(Walker, self).__init__(parent)
  self.index = index
  self.lock = lock
  self.files = files
  self.filenamesForWords = filenamesForWords
  self.commonWords = commonWords
  self.stopped = False
  self.mutex = QMutex()
  self.completed = False


 def stop(self):
  try:
   self.mutex.lock()
   self.stopped = True
  finally:
   self.mutex.unlock()


 def isStopped(self):
  try:
   self.mutex.lock()
   return self.stopped
  finally:
   self.mutex.unlock()


 def run(self):
  self.processFiles()
  self.stop()
  self.finished.emit(self.completed,self.index)


 def processFiles(self):
  def unichrFromEntity(match):
   text = match.group(match.lastindex)
   if text.isdigit():
    return chr(int(text))
   u = html.entities.name2codepoint.get(text)
   return chr(u) if u is not None else ""

  for fname in self.files:
   if self.isStopped():
    return
   words = set()
   fh = None
   try:
    fh = codecs.open(fname, "r", "UTF8", "ignore")
    text = fh.read()
   except EnvironmentError as e:
    sys.stderr.write("Error: {0}\n".format(e))
    continue
   finally:
    if fh is not None:
     fh.close()
   if self.isStopped():
    return
   text = self.STRIPHTML_RE.sub("", text)
   text = self.ENTITY_RE.sub(unichrFromEntity, text)
   text = text.lower()
   for word in self.SPLIT_RE.split(text):
    if (self.MIN_WORD_LEN <= len(word) <=
     self.MAX_WORD_LEN and
     word[0] not in self.INVALID_FIRST_OR_LAST and
     word[-1] not in self.INVALID_FIRST_OR_LAST):
     try:
      self.lock.lockForRead()
      new = word not in self.commonWords
     finally:
      self.lock.unlock()
     if new:
      words.add(word)
   if self.isStopped():
    return
   for word in words:
    try:
     self.lock.lockForWrite()
     files = self.filenamesForWords[word]
     if len(files) > self.COMMON_WORDS_THRESHOLD:
      del self.filenamesForWords[word]
      self.commonWords.add(word)
     else:
      files.add(str(fname))
    finally:
     self.lock.unlock()
   self.indexed.emit(fname,self.index)
  self.completed = True


/home/yrd/eric_workspace/chap19/pageindexer_ans.pyw

#!/usr/bin/env python3

import collections
import os
import sys
from PyQt5.QtCore import (QDir, QReadWriteLock, QMutex,Qt)
from PyQt5.QtWidgets import (QApplication, QDialog, QFileDialog, QFrame,
        QHBoxLayout, QLCDNumber, QLabel, QLineEdit, QListWidget,
        QPushButton, QVBoxLayout)
import walker_ans as walker


def isAlive(qobj):
 import sip
 try:
  sip.unwrapinstance(qobj)
 except RuntimeError:
  return False
 return True


class Form(QDialog):

 def __init__(self, parent=None):
  super(Form, self).__init__(parent)

  self.mutex = QMutex()
  self.fileCount = 0
  self.filenamesForWords = collections.defaultdict(set)
  self.commonWords = set()
  self.lock = QReadWriteLock()
  self.path = QDir.homePath()
  pathLabel = QLabel("Indexing path:")
  self.pathLabel = QLabel()
  self.pathLabel.setFrameStyle(QFrame.StyledPanel|QFrame.Sunken)
  self.pathButton = QPushButton("Set &Path...")
  self.pathButton.setAutoDefault(False)
  findLabel = QLabel("&Find word:")
  self.findEdit = QLineEdit()
  findLabel.setBuddy(self.findEdit)
  commonWordsLabel = QLabel("&Common words:")
  self.commonWordsListWidget = QListWidget()
  commonWordsLabel.setBuddy(self.commonWordsListWidget)
  filesLabel = QLabel("Files containing the &word:")
  self.filesListWidget = QListWidget()
  filesLabel.setBuddy(self.filesListWidget)
  filesIndexedLabel = QLabel("Files indexed")
  self.filesIndexedLCD = QLCDNumber()
  self.filesIndexedLCD.setSegmentStyle(QLCDNumber.Flat)
  wordsIndexedLabel = QLabel("Words indexed")
  self.wordsIndexedLCD = QLCDNumber()
  self.wordsIndexedLCD.setSegmentStyle(QLCDNumber.Flat)
  commonWordsLCDLabel = QLabel("Common words")
  self.commonWordsLCD = QLCDNumber()
  self.commonWordsLCD.setSegmentStyle(QLCDNumber.Flat)
  self.statusLabel = QLabel("Click the 'Set Path' "
         "button to start indexing")
  self.statusLabel.setFrameStyle(QFrame.StyledPanel|QFrame.Sunken)

  topLayout = QHBoxLayout()
  topLayout.addWidget(pathLabel)
  topLayout.addWidget(self.pathLabel, 1)
  topLayout.addWidget(self.pathButton)
  topLayout.addWidget(findLabel)
  topLayout.addWidget(self.findEdit, 1)
  leftLayout = QVBoxLayout()
  leftLayout.addWidget(filesLabel)
  leftLayout.addWidget(self.filesListWidget)
  rightLayout = QVBoxLayout()
  rightLayout.addWidget(commonWordsLabel)
  rightLayout.addWidget(self.commonWordsListWidget)
  middleLayout = QHBoxLayout()
  middleLayout.addLayout(leftLayout, 1)
  middleLayout.addLayout(rightLayout)
  bottomLayout = QHBoxLayout()
  bottomLayout.addWidget(filesIndexedLabel)
  bottomLayout.addWidget(self.filesIndexedLCD)
  bottomLayout.addWidget(wordsIndexedLabel)
  bottomLayout.addWidget(self.wordsIndexedLCD)
  bottomLayout.addWidget(commonWordsLCDLabel)
  bottomLayout.addWidget(self.commonWordsLCD)
  bottomLayout.addStretch()
  layout = QVBoxLayout()
  layout.addLayout(topLayout)
  layout.addLayout(middleLayout)
  layout.addLayout(bottomLayout)
  layout.addWidget(self.statusLabel)
  self.setLayout(layout)

  self.walkers = []
  self.completed = []
  self.pathButton.clicked.connect(self.setPath)
  self.findEdit.returnPressed.connect(self.find)
  self.setWindowTitle("Page Indexer")


 def stopWalkers(self):
  for walker in self.walkers:
   if isAlive(walker) and walker.isRunning():
    walker.stop()
  for walker in self.walkers:
   if isAlive(walker) and walker.isRunning():
    walker.wait()
  self.walkers = []
  self.completed = []


 def setPath(self):
  self.stopWalkers()
  self.pathButton.setEnabled(False)
  path = QFileDialog.getExistingDirectory(self,
     "Choose a Path to Index", self.path)
  if not path:
   self.statusLabel.setText("Click the 'Set Path' "
          "button to start indexing")
   self.pathButton.setEnabled(True)
   return
  self.statusLabel.setText("Scanning directories...")
  QApplication.processEvents() # Needed for Windows
  self.path = QDir.toNativeSeparators(path)
  self.findEdit.setFocus()
  self.pathLabel.setText(self.path)
  self.statusLabel.clear()
  self.filesListWidget.clear()
  self.fileCount = 0
  self.filenamesForWords = collections.defaultdict(set)
  self.commonWords = set()
  nofilesfound = True
  files = []
  index = 0
  for root, dirs, fnames in os.walk(str(self.path)):
   for name in [name for name in fnames
       if name.endswith((".htm", ".html"))]:
    files.append(os.path.join(root, name))
    if len(files) == 1000:
     self.processFiles(index, files[:])
     files = []
     index += 1
     nofilesfound = False
  if files:
   self.processFiles(index, files[:])
   nofilesfound = False
  if nofilesfound:
   self.finishedIndexing()
   self.statusLabel.setText(
     "No HTML files found in the given path")


 def processFiles(self, index, files):
  thread = walker.Walker(index, self.lock, files,
    self.filenamesForWords, self.commonWords, self)
  thread.indexed[str,int].connect(self.indexed)
  thread.finished[bool,int].connect(self.finished)
  thread.finished.connect(thread.deleteLater)
  self.walkers.append(thread)
  self.completed.append(False)
  thread.start()
  thread.wait(300) # Needed for Windows


 def find(self):
  word = str(self.findEdit.text())
  if not word:
   try:
    self.mutex.lock()
    self.statusLabel.setText("Enter a word to find in files")
   finally:
    self.mutex.unlock()
   return
  try:
   self.mutex.lock()
   self.statusLabel.clear()
   self.filesListWidget.clear()
  finally:
   self.mutex.unlock()
  word = word.lower()
  if " " in word:
   word = word.split()[0]
  try:
   self.lock.lockForRead()
   found = word in self.commonWords
  finally:
   self.lock.unlock()
  if found:
   try:
    self.mutex.lock()
    self.statusLabel.setText("Common words like '{0}' "
      "are not indexed".format(word))
   finally:
    self.mutex.unlock()
   return
  try:
   self.lock.lockForRead()
   files = self.filenamesForWords.get(word, set()).copy()
  finally:
   self.lock.unlock()
  if not files:
   try:
    self.mutex.lock()
    self.statusLabel.setText("No indexed file contains "
      "the word '{0}'".format(word))
   finally:
    self.mutex.unlock()
   return
  files = [QDir.toNativeSeparators(name) for name in
     sorted(files, key=str.lower)]
  try:
   self.mutex.lock()
   self.filesListWidget.addItems(files)
   self.statusLabel.setText(
     "{0} indexed files contain the word '{1}'".format(
     len(files), word))
  finally:
   self.mutex.unlock()


 def indexed(self, fname, index):
  try:
   self.mutex.lock()
   self.statusLabel.setText(fname)
   self.fileCount += 1
   count = self.fileCount
  finally:
   self.mutex.unlock()
  if count % 25 == 0:
   try:
    self.lock.lockForRead()
    indexedWordCount = len(self.filenamesForWords)
    commonWordCount = len(self.commonWords)
   finally:
    self.lock.unlock()
   try:
    self.mutex.lock()
    self.filesIndexedLCD.display(count)
    self.wordsIndexedLCD.display(indexedWordCount)
    self.commonWordsLCD.display(commonWordCount)
   finally:
    self.mutex.unlock()
  elif count % 101 == 0:
   try:
    self.lock.lockForRead()
    words = self.commonWords.copy()
   finally:
    self.lock.unlock()
   try:
    self.mutex.lock()
    self.commonWordsListWidget.clear()
    self.commonWordsListWidget.addItems(sorted(words))
   finally:
    self.mutex.unlock()


 def finished(self, completed, index):
  done = False
  if self.walkers:
   self.completed[index] = True
   if all(self.completed):
    try:
     self.mutex.lock()
     self.statusLabel.setText("Finished")
     done = True
    finally:
     self.mutex.unlock()
  else:
   try:
    self.mutex.lock()
    self.statusLabel.setText("Finished")
    done = True
   finally:
    self.mutex.unlock()
  if done:
   self.finishedIndexing()


 def reject(self):
  if not all(self.completed):
   self.stopWalkers()
   self.finishedIndexing()
  else:
   self.accept()


 def closeEvent(self, event=None):
  self.stopWalkers()


 def finishedIndexing(self):
  self.filesIndexedLCD.display(self.fileCount)
  self.wordsIndexedLCD.display(len(self.filenamesForWords))
  self.commonWordsLCD.display(len(self.commonWords))
  self.pathButton.setEnabled(True)
  QApplication.processEvents() # Needed for Windows


app = QApplication(sys.argv)
form = Form()
form.show()
app.exec_()

运行结果:

python3+PyQt5实现支持多线程的页面索引器应用程序

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。