分组处理方式简单介绍

介绍

在一些特定的业务中, 我们更希望成批的处理一些数据而不是来一条处理一条, 比如消息队列处理等场景, 可以一次取几百个一次处理以提高处理能力, 不过这样也带来另一个问题, 生产者短时间内不能生产几百个的时候我们应该怎么处理.

一些软件工具的 client 驱动可能会提供一些定时器的功能, 比如可以处理指定时间内(比如 2s)的数据. 这种情况下我们就可以通过几种方式批量处理数据, 达到指定数量时处理, 达到指定时间时处理, 还有一种变通的方式则是每次批量处理前检查下待处理数据的数量, 取出一定数量的数据即可. 下面的介绍主要是前面两种方式.

计时器系统调用

我们可以通过计时器系统调用简单实现上述提到的第二种方式 - 达到指定时间时处理. 在 linux 系统中, 可以使用 alarm 系统调用实现简单的闹钟功能, 不过在 POSIX 标准中, 提供了比 alarm 更多控制功能的计时器系统调用, 包含以下两个函数:

#include <sys/time.h>
int getitimer (int which, struct itimerval *value);

int setitimer (int which, const struct itimerval *value,
               struct itimerval *ovalue);

itimerval 结构体允许设置定时器过期或终止的时限, 如果设定了值, 就在过期后重启定时器:

struct itimerval {
    struct timeval it_interval;
    struct timeval it_value;
}

计时器和 alarm 操作类似, 但它可以自己自动重启, 并且可以在以下三个模式下工作:

ITIMER_REAL

测量真实事件, 在指定的真实事件过去后, 内核将 SIGALRM 信号发给进程.

ITIMER_VIRTUAL

只在进程用户空间的代码执行时减少. 当指定的进程时间过去后, 内核将 SIGVTALRM 信号发给进程;

ITIMER_PROF

在进程执行以及内核为进程服务时(比如完成一个系统调用)都会减少. 在指定的时间过去后, 内核将 SIGPROF 信号发给进程, 此模式可以和 ITIMER_VIRTUAL 共用, 这样程序可以衡量进程小孩的用户时间和内核时间.

setitimer 函数设置一个过期时间为 it_value 的定时器, 在时长超过 it_value 的时候, 内核使用 it_interval 指定的时长重启定时器, 同时内核也会发送指定的信号给进程. 基于此我们指定好信号处理函数即可简单实现定期处理数据. 很多编程语言都提供了对 setitimer 函数的封装, 比如 Perl 语言:

  use Time::HiRes qw( setitimer getitimer );

  setitimer ($which, $floating_seconds, $floating_interval );
  getitimer ($which);

python 语言的 signal 模块也提供了 setitimergetitimer 两个函数的封装, 下面提供一个简单示例说明同时对数据进行批量处理以及定时器处理.

简单示例

我们直接使用 Perl 语言的 Time::HiRes 模块函数进行说明, 示例中会读取标准输入的数据并存到数组中, 同时使用了两种方式进行打印数组元素处理, 第一种为数组数量满足 3 个的时候, 第二种为每隔 5 秒, $SIG{ALRM} 保存了信号处理函数, splice 函数每次从数组开头端取出指定数量的元素, 如下所示:

#!/usr/bin/env perl
use strict;
use warnings;
use POSIX qw(strftime);
use Time::HiRes qw( setitimer ITIMER_REAL gettimeofday);

my @a;

sub current_time {
  my ($seconds, $micro) = gettimeofday;
  return strftime("%Y-%m-%dT%H:%M:%S", gmtime($seconds))
         . "."
         . $micro;
}

$SIG{ALRM} = sub {
  my $n = @a + 0;
  if($n > 0) {
    my @b = splice @a, 0, $n;
    print current_time . " - "    
          . join(", ", @b) . "\n";
  }
};

setitimer(ITIMER_REAL, 5, 5);

while (<STDIN>) {
  chomp;
  next unless $_;
  push @a, $_;
  if ((@a + 0) % 3 == 0) {
       my @b = splice @a, 0, 3;
       print current_time . " - "    
             . join(", ", @b) . "\n";
  }
}

运行结果, 在 5 秒内手动输入 3 行数字则直接打印, 没有 3 行的数字则每隔 5 秒进行打印:

# perl timer.pl 
123
12
43
2018-06-07T13:23:48.460114 - 123, 12, 43
34
2018-06-07T13:23:55.511901 - 34
3432
21
2018-06-07T13:24:05.511955 - 3432, 21

总结说明

很多编程语言都提供了对定时器系统调用的封装, 上述的示例只是简单的介绍, 如果需要考虑数据保护, 则需要做更多的处理. 当然如果一些工具(比如 rabbitmq, rocketmq等) 的 client 驱动支持定时功能,数据的处理方式就会更为简化. 上文提到变通方式则更容易理解, 如果能有简单方法获取到数量数据的批量处理也会方便很多.

另外在文章 [表数据批量清理]/%E8%A1%A8%E8%AE%B0%E5%BD%95%E6%B8%85%E7%90%86%E6%B3%A8%E6%84%8F%E4%BA%8B%E9%A1%B9/) 我们也提到了批量处理表数据的注意事项, 和本文中提到的方式有点类似.