matlab: Queue interface to run scripts (e.g. sweeps) in parallel

pull/1/head
Thorsten Liebig 2011-11-16 13:37:44 +01:00
parent 489a45a666
commit 7ef80834cb
6 changed files with 294 additions and 2 deletions

34
matlab/Add2Queue.m Normal file
View File

@ -0,0 +1,34 @@
function [queue] = Add2Queue(queue,func_name, func_args, varargin)
% function [queue] = Add2Queue(queue,func_name, func_args, varargin)
%
% Use this function to add a funtion to the queue.
%
% For more details see: InitQueue
%
% See also: InitQueue, FinishQueue, ResultsQueue, RunOpenEMS
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig
if isfield(queue,'jobs')
jobnum = numel(queue.jobs)+1;
else
jobnum = 1;
end
queue.jobs{jobnum}.finished = 0;
queue.jobs{jobnum}.argsfile = [tempname '.mat'];
save(queue.jobs{jobnum}.argsfile,'func_args');
queue.jobs{jobnum}.nargout = nargout(func_name);
queue.jobs{jobnum}.outargsfile = [tempname '.mat'];
queue.jobs{jobnum}.command = [queue.bin queue.bin_options ' "load(''' queue.jobs{jobnum}.argsfile ''');' ...
queue.DependPath ...
'[outargs{1:' num2str(queue.jobs{jobnum}.nargout) '}]=' func_name '(func_args{:});' ...
'save(''-V7'',''' queue.jobs{jobnum}.outargsfile ''',''outargs'');' ...
'exit;"'];
[queue.jobs{jobnum}.pid, queue.jobs{jobnum}.filenames] = queue_addProcess( queue.jobs{jobnum}.command );

67
matlab/FinishQueue.m Normal file
View File

@ -0,0 +1,67 @@
function [queue] = FinishQueue(queue, query_time)
% function [queue] = FinishQueue(queue, <query_time>)
%
% Wait for the given queue to finish.
%
% Parameter:
% query_time (optional): time interval to check for finished tasks
% (in seconds, default is 5)
%
% For more details see: InitQueue
%
% See also: InitQueue, ResultsQueue, Add2Queue, RunOpenEMS
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig
if ~isfield(queue,'jobs')
return
end
if (nargin<2)
query_time = 5;
end
numJobs = numel(queue.jobs);
for n=1:numel(numJobs)
is_done = queue.jobs{n}.finished;
end
if (queue.verbose>=1)
disp(['FinishQueue: Waiting for ' num2str(sum(~is_done)) ' of ' num2str(numJobs) ' jobs to finish...']);
end
while sum(is_done)<numJobs
pause(query_time);
for n=1:numel(numJobs)
if (is_done(n)==0)
if (queue_checkProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames)==0)
queue.jobs{n}.finished=1;
load(queue.jobs{n}.outargsfile);
queue.jobs{n}.outargs = outargs;
% read in output and cleanup
[queue.jobs{n}.stdout,queue.jobs{n}.stderr] = queue_delProcess( queue.jobs{n}.pid, queue.jobs{n}.filenames );
% cleanup
delete( queue.jobs{n}.argsfile );
clear queue.jobs{n}.argsfile;
delete( queue.jobs{n}.outargsfile );
clear queue.jobs{n}.outargsfile;
is_done(n) = 1;
if (queue.verbose>=1)
disp(['FinishQueue: Job #' num2str(n) ' is finished!']);
end
end
end
end
end
if (queue.verbose>=1)
disp(['FinishQueue: All jobs done!'])
end

81
matlab/InitQueue.m Normal file
View File

@ -0,0 +1,81 @@
function [queue] = InitQueue(varargin)
% function [queue] = InitQueue(varargin)
%
% Use this function to initialize a queue to run one or more matlab scripts
% in parallel.
% This can be used to efficiently run an openEMS parameter sweep in parallel
% on multiple remote machines.
%
% Note:
% - Currently only Linux/Unix is supported
% - By default Octave is used to spawn parallel functions (saves
% licenses), but this can be changed by:
% [queue] = InitQueue('UseOctave', 0);
% You may need to change this, if your script is not octave compatible
% - To efficiently run openEMS in parallel, you need to run it on several
% machines using a SSH.host_list setting --> See also RunOpenEMS
%
% Example:
% %serial version:
% for n=1:10
% % manipulate parameter etc.
% [result1(n) result2(n)] = Parallel_Func_Name(param1, param2);
% end
%
% %parallel version:
% queue = InitQueue('DependPath',{'/opt/openEMS/CSXCAD/matlab', ...
% '/opt/openEMS/openEMS/matlab'});
% for n=1:10
% % manipulate parameter etc.
% queue = Add2Queue(queue, 'Parallel_Func_Name', {param1, param2});
% end
%
% % wait for all to finish
% [queue] = FinishQueue(queue);
%
% % retrieve result
% for n=1:numel(stub_sweep)
% [result1(n) result2(n)] = ResultsQueue(queue,n);
% end
%
% See also: Add2Queue, FinishQueue, ResultsQueue, RunOpenEMS,
% RunOpenEMS_Parallel, FindFreeSSH
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig
if ~isunix
error 'your OS is not supported (Unix only)'
end
% use octave as default to save matlab (floating) licenses
% otherwise many matlab instances are spawned
queue.use_octave = 1;
queue.verbose = 1;
% add current path
queue.DependPath = ['addpath(''' pwd ''');'];
for n=1:2:nargin
if strcmp(varargin{n},'DependPath');
for m=1:numel(varargin{n+1})
queue.DependPath = [queue.DependPath 'addpath(''' varargin{n+1}{m} ''');'];
end
end
if strcmp(varargin{n},'UseOctave');
queue.use_octave = varargin{n+1};
end
end
% set binaries and options
if (queue.use_octave)
queue.bin = ['export LD_LIBRARY_PATH=""; octave'];
queue.bin_options = [' --silent --eval'];
else
queue.bin = [matlabroot '/bin/matlab'];
queue.bin_options = [' -nodesktop -nosplash -r'];
end

24
matlab/ResultsQueue.m Normal file
View File

@ -0,0 +1,24 @@
function [varargout] = ResultsQueue(queue, n)
% function [varargout] = ResultsQueue(queue, n)
%
% Use this function to retrieve the results from a finished queue.
%
% For more details see: InitQueue
%
% See also: InitQueue, FinishQueue, Add2Queue, RunOpenEMS
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig
if n>numel(queue.jobs)
error 'ResultsQueue:job is missing'
end
if (nargout>numel(queue.jobs{n}.outargs))
error 'not enough job output arguments'
end
for n=1:numel(queue.jobs{n}.outargs)
varargout{n} = queue.jobs{n}.outargs{n};
end

View File

@ -0,0 +1,86 @@
function [stdout, stderr] = RunOpenEMS_Parallel(Sim_Paths, Sim_Files, opts, Settings)
% function [stdout, stderr] = RunOpenEMS_Parallel(Sim_Paths, Sim_Files, opts, Settings)
%
% Run multiple openEMS simulations in parallel, distributed on multiple
% machines using a ssh host_list! (currently on Linux only)
%
% This function relies on InitQueue etc.
%
% input:
% Sim_Paths: cell array of pathes to simulate by RunOpenEMS
% Sim_Files: filename or cell array of filenames to simulate
% opts: openEMS options. sa RunOpenEMS
% Settings: use the settings to define multiple host for simulation
% e.g.: Settings.SSH.bin ='<path_to_openEMS>/openEMS.sh';
% Settings.SSH.host_list = {'list','of','hosts'};
%
% Note: If no SSH host_list is defined, this function will skip the
% parallel run and switch back to a default RunOpenEMS!
%
% See also RunOpenEMS, FindFreeSSH, InitQueue
%
% openEMS matlab interface
% -----------------------
% author: Thorsten Liebig 2011
pause_queue = 1; %pause between consecutive runs (needed for FindFreeSSH)
skip_parallel = 0;
% currently only supporting linux, run conventional RunOpenEMS
if ~isunix ||
warning 'your OS is not supported (Unix only), running default RunOpenEMS';
skip_parallel = 1;
end
% in case only one path is given, run conventional RunOpenEMS
if ischar(Sim_Paths)
warning 'only a single path given, running default RunOpenEMS'
skip_parallel = 1;
end
% in case SSH.host_list is not defined, run conventional RunOpenEMS
if ~isfield(Settings,'SSH')
warning 'SSH options missing, running default RunOpenEMS'
skip_parallel = 1;
elseif ~isfield(Settings.SSH,'host_list')
warning 'SSH.host_list option missing, running default RunOpenEMS'
skip_parallel = 1;
end
if (skip_parallel)
RunOpenEMS(Sim_Paths, Sim_Files, opts, Settings)
stdout = [];
stderr = [];
return
end
if ~iscell(Sim_Paths)
error('RunOpenEMS_Parallel:needs a cell array of Sim_Paths to simulate');
end
% get the path to this file
[dir] = fileparts( mfilename('fullpath') );
queue = InitQueue('DependPath',dir);
% spawn multiple simulations
numSims = numel(Sim_Paths);
for n=1:numSims
if iscell(Sim_Files)
Sim_File = Sim_Files{n};
else
Sim_File = Sim_Files;
end
queue = Add2Queue(queue,'RunOpenEMS',{Sim_Paths{1}, Sim_File, opts, Settings});
disp(['openEMS simulation #' int2str(n) ' in directory: ' Sim_Paths{n} ' started!']);
pause(pause_queue);
end
[queue] = FinishQueue(queue);
for n=1:numel(Sim_Paths)
stdout{n} = queue.jobs{n}.stdout;
stderr{n} = queue.jobs{n}.stderr;
end

View File

@ -10,12 +10,12 @@ end
if nargout > 1
fid = fopen( filenames.stdout );
stdout = fread(fid);
stdout = fread(fid, '*char')';
fclose(fid);
end
if nargout > 2
fid = fopen( filenames.stderr );
stderr = fread(fid);
stderr = fread(fid, '*char')';
fclose(fid);
end