博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
kaldi 源码分析(三) - run.pl 分析
阅读量:6002 次
发布时间:2019-06-20

本文共 11582 字,大约阅读时间需要 38 分钟。

在 kaldi 样本(egs/xxx/s5)目录下,大都会存在如下文件及文件夹:

cmd.sh                     # 并行执行命令,通常分 run.pl, queue.pl 两种config                       # 参数定制化配置文件, mfcc, decode, cmvn 等配置文件local                         # 工程定制化内容path.sh                    # 环境变量相关脚本run.sh                      # 整体流程控制脚本,主入口脚本steps                       # 存放单一步骤执行的脚本utils                         # 存放解析文件,预处理等相关工具的脚本

对于 kaldi 这套系统而言,queue.pl 主要对 SUN 公司的 GridEngine 相对而言友好一些,对于单机执行,通常在 cmd.sh 中配置为 run.pl 即单机多进程执行。

train_cmd="utils/run.pl"decode_cmd="utils/run.pl"

如上述目录结构分析类似, 与并行计算相关的内容均在 utils/parallel 目录下

这里简单分析下 run.pl 文件, 如下:

#!/usr/bin/env perluse warnings; #sed replacement for -w perl parameter# 主要介绍了两种方式执行:#    * 常见用法:#             run.pl some.log a b c#        即在 bash 环境中执行 a b c 命令,并将日志输出到 some.log 文件中#    * 并行任务:#             run.pl JOB=1:4 some.JOB.log  a b c JOB#        即在 bash 环境中执行 a b c JOB 命令,并将日志输出到 some.JOB.log 文件中, 其中 JOB 表示执行任务的名称, 任意一个 Job 失败,整体失败。# In general, doing#  run.pl some.log a b c is like running the command a b c in# the bash shell, and putting the standard error and output into some.log.# To run parallel jobs (backgrounded on the host machine), you can do (e.g.)#  run.pl JOB=1:4 some.JOB.log a b c JOB is like running the command a b c JOB# and putting it in some.JOB.log, for each one. [Note: JOB can be any identifier].# If any of the jobs fails, this script will fail.# 典型样例如下:#        run.pl some.log my-prog "--opt=foo bar" foo \| other-prog baz# 即其与在 bash 中执行 如下命令类似: #        ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log## A typical example is:#  run.pl some.log my-prog "--opt=foo bar" foo \|  other-prog baz# and run.pl will run something like:# ( my-prog '--opt=foo bar' foo |  other-prog baz ) >& some.log## Basically it takes the command-line arguments, quotes them# as necessary to preserve spaces, and evaluates them with bash.# In addition it puts the command line at the top of the log, and# the start and end times of the command at the beginning and end.# The reason why this is useful is so that we can create a different# version of this program that uses a queueing system instead.# use Data::Dumper;# 参数少于 2 则退出程序并输出帮助信息@ARGV < 2 && die "usage: run.pl log-file command-line arguments...";# 对多可执行的 jobs 数据量$max_jobs_run = -1;# job 起止量$jobstart = 1;$jobend = 1;$ignored_opts = ""; # These will be ignored.# 首先解析 JOB=1:4 类似选项# First parse an option like JOB=1:4, and any# options that would normally be given to# queue.pl, which we will just discard.# 通过循环方式取出前两个参数for (my $x = 1; $x <= 2; $x++) { # This for-loop is to  # allow the JOB=1:n option to be interleaved with the  # options to qsub.  while (@ARGV >= 2 && $ARGV[0] =~ m:^-:) {    # parse any options that would normally go to qsub, but which will be ignored here.   # 取出选项内容    my $switch = shift @ARGV;    if ($switch eq "-V") {      $ignored_opts .= "-V ";    } elsif ($switch eq "--max-jobs-run" || $switch eq "-tc") {      # 获取最大 jobs 数量      # we do support the option --max-jobs-run n, and its GridEngine form -tc n.      $max_jobs_run = shift @ARGV;      if (! ($max_jobs_run > 0)) {        die "run.pl: invalid option --max-jobs-run $max_jobs_run";      }    } else {      my $argument = shift @ARGV;      if ($argument =~ m/^--/) {        print STDERR "run.pl: WARNING: suspicious argument '$argument' to $switch; starts with '-'\n";      }      if ($switch eq "-sync" && $argument =~ m/^[yY]/) {        # 获取同步选项        $ignored_opts .= "-sync "; # Note: in the        # corresponding code in queue.pl it says instead, just "$sync = 1;".      } elsif ($switch eq "-pe") { # e.g. -pe smp 5        # 获取 -pe 选项        my $argument2 = shift @ARGV;        $ignored_opts .= "$switch $argument $argument2 ";      } elsif ($switch eq "--gpu") {        # 获取 gpu 选项        $using_gpu = $argument;      } else {        # Ignore option.        $ignored_opts .= "$switch $argument ";      }    }  }  # 用来匹配 JOB=1:20 选项, 并将其分别放置到 jobname, jobstart, jobend 当中  if ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+):(\d+)$/) { # e.g. JOB=1:20    $jobname = $1;    $jobstart = $2;    $jobend = $3;    shift;    if ($jobstart > $jobend) {      die "run.pl: invalid job range $ARGV[0]";    }    if ($jobstart <= 0) {      die "run.pl: invalid job range $ARGV[0], start must be strictly positive (this is required for GridEngine compatibility).";    }  # 用来匹配 JOB=1 选项, 并将其分别放置到 jobname, jobstart, jobend 当中  } elsif ($ARGV[0] =~ m/^([\w_][\w\d_]*)+=(\d+)$/) { # e.g. JOB=1.    $jobname = $1;    $jobstart = $2;    $jobend = $2;    shift;  } elsif ($ARGV[0] =~ m/.+\=.*\:.*$/) {    print STDERR "run.pl: Warning: suspicious first argument to run.pl: $ARGV[0]\n";  }}# Users found this message confusing so we are removing it.# if ($ignored_opts ne "") {#   print STDERR "run.pl: Warning: ignoring options \"$ignored_opts\"\n";# }# 对 max_jobs_run 进行设置默认值if ($max_jobs_run == -1) { # If --max-jobs-run option not set,                           # then work out the number of processors if possible,                           # and set it based on that.  $max_jobs_run = 0;  if ($using_gpu) {# 当使用 GPU 时,通过执行 nvidia-smi -L 命令通过读取行数来获取最大可执行Job数量    if (open(P, "nvidia-smi -L |")) {      $max_jobs_run++ while (

); close(P); }# 没有 GPU 时,输出错误信息 if ($max_jobs_run == 0) { $max_jobs_run = 1; print STDERR "run.pl: Warning: failed to detect number of GPUs from nvidia-smi, using ${max_jobs_run}\n"; } } elsif (open(P, ") { if (m/^processor/) { $max_jobs_run++; } } if ($max_jobs_run == 0) { print STDERR "run.pl: Warning: failed to detect any processors from /proc/cpuinfo\n"; $max_jobs_run = 10; # reasonable default. } close(P); } elsif (open(P, "sysctl -a |")) { # BSD/Darwin# 在 BSD/Darwin 系统下,通过 sysctl -a 命令匹配 hw\.ncpu\s*[:=]\s*(\d+) 出CPU数量并传递给最大jobs数量 while (

) { if (m/hw\.ncpu\s*[:=]\s*(\d+)/) { # hw.ncpu = 4, or hw.ncpu: 4 $max_jobs_run = $1; last; } } close(P); if ($max_jobs_run == 0) { print STDERR "run.pl: Warning: failed to detect any processors from sysctl -a\n"; $max_jobs_run = 10; # reasonable default. } } else { # 对于非 UNIX 系统下,设置默认 32 个,可根据实际情况进行修改 # allow at most 32 jobs at once, on non-UNIX systems; change this code # if you need to change this default. $max_jobs_run = 32; } # The just-computed value of $max_jobs_run is just the number of processors # (or our best guess); and if it happens that the number of jobs we need to # run is just slightly above $max_jobs_run, it will make sense to increase # $max_jobs_run to equal the number of jobs, so we don't have a small number # of leftover jobs. $num_jobs = $jobend - $jobstart + 1; if (!$using_gpu && $num_jobs > $max_jobs_run && $num_jobs < 1.4 * $max_jobs_run) { $max_jobs_run = $num_jobs; }}# 配置日志文件$logfile = shift @ARGV;if (defined $jobname && $logfile !~ m/$jobname/ && $jobend > $jobstart) { print STDERR "run.pl: you are trying to run a parallel job but " . "you are putting the output into just one log file ($logfile)\n"; exit(1);}$cmd = "";# 重新组合指令内容foreach $x (@ARGV) { if ($x =~ m/^\S+$/) { $cmd .= $x . " "; } elsif ($x =~ m:\":) { $cmd .= "'$x' "; } else { $cmd .= "\"$x\" "; }}#$Data::Dumper::Indent=0;$ret = 0;$numfail = 0;%active_pids=();# 按 jobstart 和 jobend 创建进程,并执行相应的命令use POSIX ":sys_wait_h";for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) { if (scalar(keys %active_pids) >= $max_jobs_run) { # Lets wait for a change in any child's status # Then we have to work out which child finished $r = waitpid(-1, 0); $code = $?; if ($r < 0 ) { die "run.pl: Error waiting for child process"; } # should never happen. if ( defined $active_pids{$r} ) { $jid=$active_pids{$r}; $fail[$jid]=$code; if ($code !=0) { $numfail++;} delete $active_pids{$r}; # print STDERR "Finished: $r/$jid " . Dumper(\%active_pids) . "\n"; } else { die "run.pl: Cannot find the PID of the chold process that just finished."; } # In theory we could do a non-blocking waitpid over all jobs running just # to find out if only one or more jobs finished during the previous waitpid() # However, we just omit this and will reap the next one in the next pass # through the for(;;) cycle } $childpid = fork(); if (!defined $childpid) { die "run.pl: Error forking in run.pl (writing to $logfile)"; } if ($childpid == 0) { # We're in the child... this branch # executes the job and returns (possibly with an error status). if (defined $jobname) { $cmd =~ s/$jobname/$jobid/g; $logfile =~ s/$jobname/$jobid/g; } # 创建日志目录 system("mkdir -p `dirname $logfile` 2>/dev/null"); # 打开日志文件 open(F, ">$logfile") || die "run.pl: Error opening log file $logfile"; # 输出执行命令,及时间 print F "# " . $cmd . "\n"; print F "# Started at " . `date`; $starttime = `date +'%s'`; print F "#\n"; close(F); # 开始执行命令 # Pipe into bash.. make sure we're not using any other shell. open(B, "|bash") || die "run.pl: Error opening shell command"; print B "( " . $cmd . ") 2>>$logfile >> $logfile"; close(B); # If there was an error, exit status is in $? $ret = $?; $lowbits = $ret & 127; $highbits = $ret >> 8; if ($lowbits != 0) { $return_str = "code $highbits; signal $lowbits" } else { $return_str = "code $highbits"; } # 输出命令结束时间 $endtime = `date +'%s'`; open(F, ">>$logfile") || die "run.pl: Error opening log file $logfile (again)"; $enddate = `date`; chop $enddate; print F "# Accounting: time=" . ($endtime - $starttime) . " threads=1\n"; print F "# Ended ($return_str) at " . $enddate . ", elapsed time " . ($endtime-$starttime) . " seconds\n"; close(F); exit($ret == 0 ? 0 : 1); } else { $pid[$jobid] = $childpid; $active_pids{$childpid} = $jobid; # print STDERR "Queued: " . Dumper(\%active_pids) . "\n"; }}# 等待所有进程结束并判定其是否执行成功# Now we have submitted all the jobs, lets wait until all the jobs finishforeach $child (keys %active_pids) { $jobid=$active_pids{$child}; $r = waitpid($pid[$jobid], 0); $code = $?; if ($r == -1) { die "run.pl: Error waiting for child process"; } # should never happen. if ($r != 0) { $fail[$jobid]=$code; $numfail++ if $code!=0; } # Completed successfully}# 判定每个任务结果是否执行成功# Some sanity checks:# The $fail array should not contain undefined codes# The number of non-zeros in that array should be equal to $numfail# We cannot do foreach() here, as the JOB ids do not necessarily start by zero$failed_jids=0;for ($jobid = $jobstart; $jobid <= $jobend; $jobid++) { $job_return = $fail[$jobid]; if (not defined $job_return ) { # print Dumper(\@fail); die "run.pl: Sanity check failed: we have indication that some jobs are running " . "even after we waited for all jobs to finish" ; } if ($job_return != 0 ){ $failed_jids++;}}if ($failed_jids != $numfail) { die "run.pl: Sanity check failed: cannot find out how many jobs failed ($failed_jids x $numfail)."}if ($numfail > 0) { $ret = 1; }if ($ret != 0) { $njobs = $jobend - $jobstart + 1; if ($njobs == 1) { if (defined $jobname) { $logfile =~ s/$jobname/$jobstart/; # only one numbered job, so replace name with # that job. } print STDERR "run.pl: job failed, log is in $logfile\n"; if ($logfile =~ m/JOB/) { print STDERR "run.pl: probably you forgot to put JOB=1:\$nj in your script."; } } else { $logfile =~ s/$jobname/*/g; print STDERR "run.pl: $numfail / $njobs failed, log is in $logfile\n"; }}# 结束并返回结果exit ($ret);

转载地址:http://dqdmx.baihongyu.com/

你可能感兴趣的文章
HSRP 原理
查看>>
监控队列的脚本
查看>>
我不是九爷 带你了解 CloudStack+XenServer详细部署方案(3):CloudStack管理节点的安装和配置...
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
用户调查报告
查看>>
发布一个php分页类
查看>>
我的友情链接
查看>>
${basePath}
查看>>
linux命令之uniq简单用法
查看>>
使用Eclipse调试Java程序的10个技巧
查看>>
Hive分桶表
查看>>
oracle10g 启动时报错:ORA-32004 ORA-19905
查看>>
思科分发列表过滤路由(RIP)动态路由协议篇
查看>>
PCB应用于哪些方面
查看>>
网络卡顿怎么办?
查看>>
Oracle的rowid
查看>>
Apache源码编译安装详解
查看>>
npm被墙解决方法
查看>>
实现Redis高可用
查看>>