queues: update & fixes

pull/1/head
Thorsten Liebig 2011-11-28 14:09:14 +01:00
parent eca5501c98
commit 32a759757f
5 changed files with 80 additions and 34 deletions

View File

@ -17,7 +17,18 @@ else
jobnum = 1; jobnum = 1;
end end
queue.jobs{jobnum}.finished = 0; running = numel(queue.jobs_finished) - sum(queue.jobs_finished);
while (running>=queue.maxThreads)
[queue running] = CheckQueue(queue);
end
if (queue.verbose>=1)
disp(['Add2Queue: Job #' num2str(jobnum) ' starting...']);
end
queue.jobs_finished(jobnum) = 0;
queue.jobs{jobnum}.argsfile = [tempname '.mat']; queue.jobs{jobnum}.argsfile = [tempname '.mat'];
save(queue.jobs{jobnum}.argsfile,'func_args'); save(queue.jobs{jobnum}.argsfile,'func_args');

56
matlab/CheckQueue.m Normal file
View File

@ -0,0 +1,56 @@
function [queue running] = CheckQueue(queue, query_time)
% function [queue running] = CheckQueue(queue, <query_time>)
%
% Check the given queue for finished tasks.
%
% Parameter:
% query_time (optional): time interval to check for finished tasks
% (in seconds, default is 5)
%
% For more details see: InitQueue
%
% See also: InitQueue, ResultsQueue, Add2Queue, RunOpenEMS
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig
if ~isfield(queue,'jobs')
running = 0;
return
end
if (nargin<2)
query_time = 5;
end
numJobs = numel(queue.jobs);
pause(query_time);
for n=1:numJobs
if (queue.jobs_finished(n)==0)
if (queue_checkProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames)==0)
queue.jobs_finished(n)=1;
load(queue.jobs{n}.outargsfile);
queue.jobs{n}.outargs = outargs;
% read in output and cleanup
[queue.jobs{n}.stdout,queue.jobs{n}.stderr] = queue_delProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames );
% cleanup
delete( queue.jobs{n}.argsfile );
clear queue.jobs{n}.argsfile;
delete( queue.jobs{n}.outargsfile );
clear queue.jobs{n}.outargsfile;
queue.jobs_finished(n) = 1;
if (queue.verbose>=1)
disp(['CheckQueue: Job #' num2str(n) ' is finished!']);
end
end
end
end
running = numel(queue.jobs_finished) - sum(queue.jobs_finished);

View File

@ -25,41 +25,14 @@ end
numJobs = numel(queue.jobs); numJobs = numel(queue.jobs);
for n=1:numel(numJobs)
is_done = queue.jobs{n}.finished;
end
if (queue.verbose>=1) if (queue.verbose>=1)
disp(['FinishQueue: Waiting for ' num2str(sum(~is_done)) ' of ' num2str(numJobs) ' jobs to finish...']); disp(['FinishQueue: Waiting for ' num2str(sum(~queue.jobs_finished)) ' of ' num2str(numJobs) ' jobs to finish...']);
end end
while sum(is_done)<numJobs running = numel(queue.jobs_finished) - sum(queue.jobs_finished);
pause(query_time);
for n=1:numel(numJobs) while sum(running)>0
if (is_done(n)==0) [queue running] = CheckQueue(queue, query_time);
if (queue_checkProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames)==0)
queue.jobs{n}.finished=1;
load(queue.jobs{n}.outargsfile);
queue.jobs{n}.outargs = outargs;
% read in output and cleanup
[queue.jobs{n}.stdout,queue.jobs{n}.stderr] = queue_delProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames );
% cleanup
delete( queue.jobs{n}.argsfile );
clear queue.jobs{n}.argsfile;
delete( queue.jobs{n}.outargsfile );
clear queue.jobs{n}.outargsfile;
is_done(n) = 1;
if (queue.verbose>=1)
disp(['FinishQueue: Job #' num2str(n) ' is finished!']);
end
end
end
end
end end
if (queue.verbose>=1) if (queue.verbose>=1)

View File

@ -55,6 +55,7 @@ queue.use_octave = 1;
queue.verbose = 1; queue.verbose = 1;
queue.maxThreads = Inf;
% add current path % add current path
queue.DependPath = ['addpath(''' pwd ''');']; queue.DependPath = ['addpath(''' pwd ''');'];
@ -68,6 +69,9 @@ for n=1:2:nargin
if strcmp(varargin{n},'UseOctave'); if strcmp(varargin{n},'UseOctave');
queue.use_octave = varargin{n+1}; queue.use_octave = varargin{n+1};
end end
if strcmp(varargin{n},'MaxThreads');
queue.maxThreads = varargin{n+1};
end
end end
@ -79,3 +83,5 @@ else
queue.bin = [matlabroot '/bin/matlab']; queue.bin = [matlabroot '/bin/matlab'];
queue.bin_options = [' -nodesktop -nosplash -r']; queue.bin_options = [' -nodesktop -nosplash -r'];
end end
queue.jobs_finished = [];

View File

@ -19,6 +19,6 @@ if (nargout>numel(queue.jobs{n}.outargs))
error 'not enough job output arguments' error 'not enough job output arguments'
end end
for n=1:numel(queue.jobs{n}.outargs) for k=1:numel(queue.jobs{n}.outargs)
varargout{n} = queue.jobs{n}.outargs{n}; varargout{k} = queue.jobs{n}.outargs{k};
end end