diff --git a/matlab/Add2Queue.m b/matlab/Add2Queue.m index 4339b7c..f9de813 100644 --- a/matlab/Add2Queue.m +++ b/matlab/Add2Queue.m @@ -17,7 +17,18 @@ else jobnum = 1; end -queue.jobs{jobnum}.finished = 0; +running = numel(queue.jobs_finished) - sum(queue.jobs_finished); + +while (running>=queue.maxThreads) + [queue running] = CheckQueue(queue); +end + + +if (queue.verbose>=1) + disp(['Add2Queue: Job #' num2str(jobnum) ' starting...']); +end + +queue.jobs_finished(jobnum) = 0; queue.jobs{jobnum}.argsfile = [tempname '.mat']; save(queue.jobs{jobnum}.argsfile,'func_args'); diff --git a/matlab/CheckQueue.m b/matlab/CheckQueue.m new file mode 100644 index 0000000..bd9747c --- /dev/null +++ b/matlab/CheckQueue.m @@ -0,0 +1,56 @@ +function [queue running] = CheckQueue(queue, query_time) +% function [queue running] = CheckQueue(queue, ) +% +% Check the given queue for finished tasks. +% +% Parameter: +% query_time (optional): time interval to check for finished tasks +% (in seconds, default is 5) +% +% For more details see: InitQueue +% +% See also: InitQueue, ResultsQueue, Add2Queue, RunOpenEMS +% +% openEMS matlab interface +% ----------------------- +% author: Thorsten Liebig + +if ~isfield(queue,'jobs') + running = 0; + return +end + +if (nargin<2) + query_time = 5; +end + +numJobs = numel(queue.jobs); + +pause(query_time); + +for n=1:numJobs + if (queue.jobs_finished(n)==0) + if (queue_checkProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames)==0) + queue.jobs_finished(n)=1; + load(queue.jobs{n}.outargsfile); + queue.jobs{n}.outargs = outargs; + + % read in output and cleanup + [queue.jobs{n}.stdout,queue.jobs{n}.stderr] = queue_delProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames ); + + % cleanup + delete( queue.jobs{n}.argsfile ); + clear queue.jobs{n}.argsfile; + delete( queue.jobs{n}.outargsfile ); + clear queue.jobs{n}.outargsfile; + + queue.jobs_finished(n) = 1; + + if (queue.verbose>=1) + disp(['CheckQueue: Job #' num2str(n) ' is finished!']); + end + end + end +end + +running = numel(queue.jobs_finished) - sum(queue.jobs_finished); diff --git a/matlab/FinishQueue.m b/matlab/FinishQueue.m index 97203b4..10020c5 100644 --- a/matlab/FinishQueue.m +++ b/matlab/FinishQueue.m @@ -25,41 +25,14 @@ end numJobs = numel(queue.jobs); -for n=1:numel(numJobs) - is_done = queue.jobs{n}.finished; -end - if (queue.verbose>=1) - disp(['FinishQueue: Waiting for ' num2str(sum(~is_done)) ' of ' num2str(numJobs) ' jobs to finish...']); + disp(['FinishQueue: Waiting for ' num2str(sum(~queue.jobs_finished)) ' of ' num2str(numJobs) ' jobs to finish...']); end -while sum(is_done)=1) - disp(['FinishQueue: Job #' num2str(n) ' is finished!']); - end - end - end - end +while sum(running)>0 + [queue running] = CheckQueue(queue, query_time); end if (queue.verbose>=1) diff --git a/matlab/InitQueue.m b/matlab/InitQueue.m index 1895ec9..d78ffa0 100644 --- a/matlab/InitQueue.m +++ b/matlab/InitQueue.m @@ -55,6 +55,7 @@ queue.use_octave = 1; queue.verbose = 1; +queue.maxThreads = Inf; % add current path queue.DependPath = ['addpath(''' pwd ''');']; @@ -68,6 +69,9 @@ for n=1:2:nargin if strcmp(varargin{n},'UseOctave'); queue.use_octave = varargin{n+1}; end + if strcmp(varargin{n},'MaxThreads'); + queue.maxThreads = varargin{n+1}; + end end @@ -79,3 +83,5 @@ else queue.bin = [matlabroot '/bin/matlab']; queue.bin_options = [' -nodesktop -nosplash -r']; end + +queue.jobs_finished = []; diff --git a/matlab/ResultsQueue.m b/matlab/ResultsQueue.m index 03c3c28..cc9f745 100644 --- a/matlab/ResultsQueue.m +++ b/matlab/ResultsQueue.m @@ -19,6 +19,6 @@ if (nargout>numel(queue.jobs{n}.outargs)) error 'not enough job output arguments' end -for n=1:numel(queue.jobs{n}.outargs) - varargout{n} = queue.jobs{n}.outargs{n}; +for k=1:numel(queue.jobs{n}.outargs) + varargout{k} = queue.jobs{n}.outargs{k}; end