Google

星期三, 九月 24, 2008

Multiple Thread Progress Bar Control

Introduction

This article presents a control that creates threads and displays the progress and any messages sent from the thread.

Background

Running multiple threads and seeing the status of where they are has been a tricky subject for quite a while. The main thing being that you could not access anything that is being controlled by the main (UI) thread. This includes all parts of the form you are displaying and the properties and variables on it.

The .Net framework 2.0 has helped clear up some of that with the introduction of the BackgroundWorker class. This class allows you to run some code on its own thread. This allows your UI to remain responsive while the code runs.

What It Does

This control can be easily modified to be used to monitor/control batch operations that can be done simultaneously. Some examples:

  • File Transfers
  • Image Processing
  • Directory Processing
  • Screen scraping

Each thread can send its own messages back to the container. The container shows those messages in a grid as the threads are running.

The thread container actually does throttling as well. Before you start the threads, you can set the number of threads you want to run concurrently. I hope to eventually make this property dynamic so that it can be changed on the fly.

What It Does Not Do

This control is designed for items that do not update the UI other than what is already displayed. If you need to render objects and update the screen, you will need to modify the way this works. That may be another article.

How it Works

Each thread is represented in a ThreadView which inherits from UserControl. This ThreadView contains a BackgroundWorker. When the ThreadContainer.Start method is called it creates as many threads as it can (either Concurrent or TotalThreads if it is less than Concurrent). The ThreadViews are docked to the top so that they are all displayed nicely and expand to the width of the ThreadContainer. After creating the new ThreadView and displays it and starts it. As it works it raises its JobEvent event. This passes the message to the container and the ThreadContainer stores it in its DataTable.

Each ThreadView contains a JobCompleted event that notifies the ThreadContainer that it is done.

http://www.codeproject.com/KB/progress/MultiThreadProgressBar.aspx

标签: , , ,

星期四, 六月 26, 2008

[Python]用 Python 实现的线程池

为了提高程序的效率,经常要用到多线程,尤其是IO等需要等待外部响应的部分。线程的创建、销毁和调度本身是有代价的,如果一个线程的任务相对简 单,那这些时间和空间开销就不容忽视了,此时用线程池就是更好的选择,即创建一些线程然后反复利用它们,而不是在完成单个任务后就结束。

下面是用Python实现的通用的线程池代码:

  1. import Queue, threading, sys
  2. from threading import Thread
  3. import time,urllib
  4. # working thread
  5. class Worker(Thread):
  6. worker_count = 0
  7. def __init__( self, workQueue, resultQueue, timeout = 0, **kwds):
  8. Thread.__init__( self, **kwds )
  9. self.id = Worker.worker_count
  10. Worker.worker_count += 1
  11. self.setDaemon( True )
  12. self.workQueue = workQueue
  13. self.resultQueue = resultQueue
  14. self.timeout = timeout
  15. def run( self ):
  16. ''' the get-some-work, do-some-work main loop of worker threads '''
  17. while True:
  18. try:
  19. callable, args, kwds = self.workQueue.get(timeout=self.timeout)
  20. res = callable(*args, **kwds)
  21. print "worker[%2d]: %s" % (self.id, str(res) )
  22. self.resultQueue.put( res )
  23. except Queue.Empty:
  24. break
  25. except :
  26. print 'worker[%2d]' % self.id, sys.exc_info()[:2]
  27. class WorkerManager:
  28. def __init__( self, num_of_workers=10, timeout = 1):
  29. self.workQueue = Queue.Queue()
  30. self.resultQueue = Queue.Queue()
  31. self.workers = []
  32. self.timeout = timeout
  33. self._recruitThreads( num_of_workers )
  34. def _recruitThreads( self, num_of_workers ):
  35. for i in range( num_of_workers ):
  36. worker = Worker( self.workQueue, self.resultQueue, self.timeout )
  37. self.workers.append(worker)
  38. def start(self):
  39. for w in self.workers:
  40. w.start()
  41. def wait_for_complete( self):
  42. # ...then, wait for each of them to terminate:
  43. while len(self.workers):
  44. worker = self.workers.pop()
  45. worker.join( )
  46. if worker.isAlive() and not self.workQueue.empty():
  47. self.workers.append( worker )
  48. print "All jobs are are completed."
  49. def add_job( self, callable, *args, **kwds ):
  50. self.workQueue.put( (callable, args, kwds) )
  51. def get_result( self, *args, **kwds ):
  52. return self.resultQueue.get( *args, **kwds )

Worker类是一个工作线程,不断地从workQueue队列中获取需要执行的任务,执行之,并将结果写入到resultQueue中,这里的 workQueue和resultQueue都是现成安全的,其内部对各个线程的操作做了互斥。当从workQueue中获取任务超时,则线程结束。

WorkerManager负责初始化Worker线程,提供将任务加入队列和获取结果的接口,并能等待所有任务完成。

一个典型的测试例子如下,它用10个线程去下载一个固定页面的内容,实际应用时应该是执行不同的任务。

  1. def test_job(id, sleep = 0.001 ):
  2. try:
  3. urllib.urlopen('https://www.gmail.com/').read()
  4. except:
  5. print '[%4d]' % id, sys.exc_info()[:2]
  6. return id
  7. def test():
  8. import socket
  9. socket.setdefaulttimeout(10)
  10. print 'start testing'
  11. wm = WorkerManager(10)
  12. for i in range(500):
  13. wm.add_job( test_job, i, i*0.001 )
  14. wm.start()
  15. wm.wait_for_complete()
  16. print 'end testing'

完成的程序可以在这里下载

http://blogger.org.cn/blog/more.asp?name=lhwork&id=22262

标签: ,

星期五, 六月 20, 2008

A PYTHON APPLICATION WHERE THREADS WOULD HELP

Let's say that you want to check the availability of many computers on a network ... you'll use ping. But there's a problem - if you "ping" a host that's not running it takes a while to timeout, so that when you check through a whole lot of systems that aren't responding - the very time a quick response is probably needed - it can take an age.

Here's a Python program that "ping"s 10 hosts in sequence.

import os
import re
import time
import sys

lifeline = re.compile(r"(\d) received")
report = ("No response","Partial Response","Alive")

print time.ctime()

for host in range(60,70):
ip = "192.168.200."+str(host)
pingaling = os.popen("ping -q -c2 "+ip,"r")
print "Testing ",ip,
sys.stdout.flush()
while 1:
line = pingaling.readline()
if not line: break
igot = re.findall(lifeline,line)
if igot:
print report[int(igot[0])]

print time.ctime()

Running that program, it works but it's a bit slow:

[trainee@buttercup trainee]$ python alive
Mon May 9 05:22:51 2005
Testing 192.168.200.60 No response
Testing 192.168.200.61 No response
Testing 192.168.200.62 No response
Testing 192.168.200.63 No response
Testing 192.168.200.64 No response
Testing 192.168.200.65 No response
Testing 192.168.200.66 Alive
Testing 192.168.200.67 No response
Testing 192.168.200.68 No response
Testing 192.168.200.69 No response
Mon May 9 05:23:19 2005
[trainee@buttercup trainee]$

That was 28 seconds - in other words, an extra 3 seconds per unavailable host.

THE SAME APPLICATION, WRITTEN USING PYTHON THREADS

I'll write the application and test it first ... then add a few notes at the bottom.

import os
import re
import time
import sys
from threading import Thread

class testit(Thread):
def __init__ (self,ip):
Thread.__init__(self)
self.ip = ip
self.status = -1
def run(self):
pingaling = os.popen("ping -q -c2 "+self.ip,"r")
while 1:
line = pingaling.readline()
if not line: break
igot = re.findall(testit.lifeline,line)
if igot:
self.status = int(igot[0])

testit.lifeline = re.compile(r"(\d) received")
report = ("No response","Partial Response","Alive")

print time.ctime()

pinglist = []

for host in range(60,70):
ip = "192.168.200."+str(host)
current = testit(ip)
pinglist.append(current)
current.start()

for pingle in pinglist:
pingle.join()
print "Status from ",pingle.ip,"is",report[pingle.status]

print time.ctime()

And running:

[trainee@buttercup trainee]$ python kicking
Mon May 9 05:23:36 2005
Status from 192.168.200.60 is No response
Status from 192.168.200.61 is No response
Status from 192.168.200.62 is No response
Status from 192.168.200.63 is No response
Status from 192.168.200.64 is No response
Status from 192.168.200.65 is No response
Status from 192.168.200.66 is Alive
Status from 192.168.200.67 is No response
Status from 192.168.200.68 is No response
Status from 192.168.200.69 is No response
Mon May 9 05:23:39 2005
[trainee@buttercup trainee]$

3 seconds - much more acceptable than the 28 seconds that we got when we pinged the hosts one by one and waited on each.

http://www.wellho.net/solutions/python-python-threads-a-first-example.html

标签: ,

星期五, 三月 28, 2008

WIN32下DELPHI中的多线程【变量存储】(三)

线程中的变量
由于每个线程都代表了一个不同的执行路径,因此,最好有一种只限于一个线程内部使用的数据,
要实现上述目的有以下几种方式:
1、局部变量(基于栈),很简单,在你的线程函数中你定义的变量既是如此。由于每个线程都在各自的栈中,各个线程将都有一套局部变量的副本,这样,就不会相互影响。对于那些只在过程或函数的生存期有意义的变量,应当把它们声明为局部变量。
2、存储在线程对象中。还记得createthread函数中的lpparameter参数吗,它可以接受一个无类型的指针。结合本文第一章的内容,你应 该还记得,它被存储在线程内核对象的上下文结构中,你可以通过context结构中的context_integer部分的ebx来读取它的地址。
下面是一段示例代码,用来演示读取context结构,这段代码一般用不到,但它可以说明cratethread函数中的lpparameter被存储的位置

{
作者:wudi_1982
联系方式:wudi_1982@hotmail.com
转载请著名出处,本代码为演示代码,只贴出了一些关键部分
}


type
//传递给线程函数的结构和指针的声明
tinfo = record
count : integer;
x : integer;
y : integer;
end;
pinfo
= ^tinfo;

var
mythreadhad : thandle;
//一个全局变量,用来保存线程的句柄

//线程函数
function mythread(info : pointer):dword; stdcall;
var
i : integer;
begin
//根据传递来信息决定在窗口的那个位置输出什么信息
for i := 0 to pinfo(info)^.count-1 do
form1.image1.canvas.textout(pinfo(info)
^.x,pinfo(info)^.y,inttostr(i));
//freemem(info);
result := 0;
end;

//创建一个线程
procedure tform1.button4click(sender: tobject);
var
ppi : pinfo;
mythreadid : dword;
begin
//分配空间并赋初值
ppi :=allocmem(sizeof(tinfo));
ppi
^.count := 1000000;
ppi
^.x := 10;
ppi
^.y := 10;
//创建
mythreadhad := createthread(nil,0,@mythread,ppi,create_suspended,mythreadid);
//在窗体上显示线程函数的地址和传递给它的参数的地址
labthreadaddr.caption := inttostr( integer(@mythread));
labthreadpvparam.caption :
= inttostr(integer(ppi));
end;

//读取context结构,注意context结构是和cpu有关的,我这里测试时,工作在intel的cpu上
procedure tform1.btnrcontextclick(sender: tobject);
var
con : _context;
begin
//初始化结构
con.contextflags := context_full;
//读取
getthreadcontext(mythreadhad,con);
//显示在窗体的listbox上
with lbxcontextinfo.items do
begin
// clear;
add(------------context--------------);
add(
);
add(
context_debug_registers-----);
add(
dr0:+#9+inttostr(con.dr0));
add(
dr1:+#9+inttostr(con.dr1));
add(
dr2:+#9+inttostr(con.dr2));
add(
dr3:+#9+inttostr(con.dr3));
add(
dr6:+#9+inttostr(con.dr6));
add(
dr7:+#9+inttostr(con.dr7));
add(
context_segments---------);
add(
seggs:+#9+inttostr(con.seggs));
add(
segfs:+#9+inttostr(con.segfs));
add(
seges:+#9+inttostr(con.seges));
add(
segds:+#9+inttostr(con.segds));
add(
context_integer.---------);
add(
edi: +#9+inttostr(con.edi));
add(
esi: +#9+inttostr(con.esi));
add(
ebx: +#9+inttostr(con.ebx));
add(
edx: +#9+inttostr(con.edx));
add(
ecx: +#9+inttostr(con.ecx));
add(
eax: +#9+inttostr(con.eax));
add(
context_control----------);
add(
ebp: +#9+inttostr(con.ebp));
add(
eip: +#9+inttostr(con.eip));
add(
segcs: +#9+inttostr(con.segcs));
add(
eflags: +#9+inttostr(con.eflags));
add(
esp: +#9+inttostr(con.esp));
add(
segss: +#9+inttostr(con.segss));
end;

end;

把上面代码整理之后,添加到你的程序中,你可以发现(如果也是intel的cpu),那么你可以从eax寄存器读取到线程函数的地址,从ebx中读取到传递给线程函数的参数地址。
在delphi中的tthread对象的构造函数中,你可以看到这段代码
fhandle := beginthread(nil, 0, @threadproc, pointer(self), create_suspended, fthreadid);
再 观察beginthread的实现,你会发现tthread的调用createthread时,将pointer(self),也就是tthread对象 本身当作线程函数的参数传递过去,换言之,你在tthread的派生类中定义的变量,对于一个线程而言,将存储在这个线程单独的堆栈中,而它在堆栈的地址 存储在线程的上下文结构中。
可以做一个简单的试验,将一个线程生成多次,你可以发现存储在线程对象内部的变量将互不影响。
说到这里,必须谈论一个问题,效率的问题,我在一本书上曾经看到过这样一段话“由于访问线程对象中的数据比访问线程局部变量要快10倍,因此,你应当尽可 能地把线程专用的信息保存在线程对象中。”对此,我一直没有特别理解。如果一定要相信这句话,那我会这么理解,就是存储在线程对象中的变量因为上下文结构 记录了它的地址等原因,所以它更快。尽信书不如无书,我还在思考,不过好在这种速度的影响对于通常的使用而言影响不大。

3、在delphi中,用object pascal的关键字threadvar来声明变量,以利用操作系统级的线程局部存储。
在前面我们了解到:虽然对于局部变量,在每个线程中都一个副本,然而应用程序的全局变量是被所有线程所共享的。当多个线程对这个全局变量进行访问时,将可 能出现很多未知的问题,win32提供了一种称为线程局部存储的方式,它能使你在第一个运行的线程中创建一个全局变量的拷贝。delphi利用关键字 threadvar封装此功能。在threadvar关键字下你可以声明任何局部存储的变量。
4、全局变量,多线程最让人头疼的地方就是全局变量 了,好的同步方式将决定你高效、安全的访问全局变量,虽然上述的threadvar是解决全局变量线程局部存储的一个办法,但在我实际的编码工作中,几乎 很少用它,它的局限性太多。多线程访问全局变量的方法将在下一文中详细描述。

http://www.west263.com/www/info/41142-1.htm

标签: ,

辽ICP备05003652号
流风洄雪听天籁,轻云蔽日看落花

Powered by Blogger