redis-queue多进程下消费如何保证不重复入库的问题

864328615

问题描述

大概是我这边在处理队列数据时需要判断表中是否存在,不存在就向表中插入数据,存在的话就更新这条数据,这个逻辑在单进程下正常,但是多进程下,会出现重复入库的问题。

下面代码:以day为条件查询是否存在了当天的统计记录,存在就更新统计,不存在就新增一条当天的统计记录,但是多个进程下,好多个进程取到的都是当天不同时刻的数据,我这边判断只能以日期判断,造成数据重复入库问题。想过在表中加入唯一索引,这样写入时会抛出异常,当前数据重回队列等待下次消费,但是x次后数据就被丢弃了,对这个当日统计记录来说会存在丢数据的风险,求大佬给指点下。

$row = Db::table('statistic')->where('day', $day)->first();
        if ($row) {
            $data = [
                'count'         => $row->count + 1,
                'cost'          => $row->cost + $costTime,
                'success_count' => $row->success_count + ($success ? 1 : 0),
                'error_count'   => $row->error_count + ($success ? 0 : 1),
            ];
            Db::table('statistic')->where('day', $day)->update($data);
        } else {
            $data = [
                'day'           => $day,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic')->insert($data);
        }

本问题不在讨论,感觉是弯路,但是下面大佬关于锁的指点确实很有启发,大家看到了只学习大佬的思路,不要看我那个代码逻辑了,误人子弟

1453 13 3
13个回答

ak47f16200
  1. 数据加悲观锁
  2. 加唯一索引,处理异常(更新)
  3. 提前建好,队列直接更新操作
  • 864328615 2023-08-10

    不知道咋搞了,我看了下,大概有13个统计表,要是都提前建好,我就不太需要队列处理了,被关锁的话几乎就把多进程强制单进程了,那个加唯一索引 异常处理貌似可以,我测试下

  • efnic 2023-08-10
864328615
$transferData = self::$transferData;
        $project = $transferData['project'];
        $time = date('Y-m-d H:i:s');
        try{
            Db::table('project')->insert([
                'project'    => $transferData['project'],
                'created_at' => $time,
                'updated_at' => $time
            ]);
        }catch (\Throwable $th){
            Db::table('project')->where('project', $project)->update([
                'updated_at' => $time
            ]);
        }

表中加入了唯一索引,代码调成这种后会出现mysql死锁的情况

  • 864328615 2023-08-10

    SQLSTATE[40001]: Serialization failure: 1213 Deadlock found when trying to get lock; try restarting transaction (SQL: update project set updated_at = 2023-08-10 12:10:37 where project = 营销系统)

  • 软饭工程师 2023-08-13

    我记得我好像在哪看过开发规范,不建议在catch 异常捕获中做业务处理

efnic

安装插件:https://www.workerman.net/plugin/55

创建统计锁

/**
 * 统计锁
 * @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 数据库操作锁
 */
class StatisticLocker extends Locker
{
}

使用锁

//取锁:有效期3秒,自释放
$lock = \app\services\lock\StatisticLocker::lock($day, 3, true);
//多进程阻塞等待
if ($lock->acquire(true)) {
    $row = Db::table('statistic')->where('day', $day)->first();
    if ($row) {
        $data = [
            'count'         => $row->count + 1,
            'cost'          => $row->cost + $costTime,
            'success_count' => $row->success_count + ($success ? 1 : 0),
            'error_count'   => $row->error_count + ($success ? 0 : 1),
        ];
        Db::table('statistic')->where('day', $day)->update($data);
    } else {
        $data = [
            'day'           => $day,
            'count'         => 1,
            'cost'          => $costTime,
            'success_count' => $success ? 1 : 0,
            'error_count'   => $success ? 0 : 1,
        ];
        Db::table('statistic')->insert($data);
    }
}
//代码执行到这里,会立刻自动释放锁
  • 864328615 2023-08-10

    谢谢大佬 我跑下测试下

  • 864328615 2023-08-10

    大佬 我有十多个表要更新,我实际业务是写了10多个方法,我粘贴出来

864328615
class SyncStatisticsToMysql implements Consumer
{
    // 要消费的队列名
    public string $queue = 'sync-statistics-to-mysql';

    // 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
    public string $connection = 'default';

    // 消费的数据
    public static array $transferData = [];

    // 消费
    public function consume($data)
    {
        self::$transferData = $data;
        Db::beginTransaction();
        try {
            // 组装数据
            self::assembleData();
            // 应用
            self::project();
            // 调用记录
            self::tracing();
            // 总统计
            self::statistic();
            // 应用总统计
            self::statisticProject();
            // 应用每分钟统计
            self::statisticProjectInterval();
            // 应用调用统计
            self::statisticProjectTransfer();
            // 应用调用每分钟统计
            self::statisticProjectTransferInterval();
            // 应用状态码统计
            self::statisticProjectCode();
            // 应用状态码每分钟统计
            self::statisticProjectCodeInterval();
            // 应用IP统计
            self::statisticProjectIp();
            // 应用IP每分钟统计
            self::statisticProjectIpInterval();
            // 应用IP状态码统计
            self::statisticProjectIpCode();
            // 应用IP状态码每分钟统计
            self::statisticProjectIpCodeInterval();
            // 应用IP调用统计
            self::statisticProjectIpTransfer();
            // 应用IP调用每分钟统计
            self::statisticProjectIpTransferInterval();
            Db::commit();
        } catch (\Throwable $th) {
            Db::rollBack();
            var_dump($th->getMessage());
            throw new ServerErrorHttpException($th->getMessage());
        }
    }

    /**
     * 组装数据
     */
    protected static function assembleData()
    {
        $transferData = self::$transferData;
        $names = explode('-', $transferData['project']);
        $transferData['project'] = $names[0];
        $transferData['type'] = $names[1] ?? 'All';
        $timer = strtotime($transferData['time']);
        $transferData['day'] = date('Ymd', $timer);
        $transferData['minute'] = date('YmdHi', $timer);
        $transferData['trace'] = uniqid();
        self::$transferData = $transferData;
    }

    /**
     * 应用
     */
    protected static function project()
    {
        $transferData = self::$transferData;
        $project = $transferData['project'];
        $time = date('Y-m-d H:i:s');
        try {
            Db::table('project')->insert([
                'project'    => $transferData['project'],
                'created_at' => $time,
                'updated_at' => $time
            ]);
        } catch (\PDOException | \Exception $e) {
            Db::table('project')->where('project', $project)->update([
                'updated_at' => $time
            ]);
        }
    }

    /**
     * 调用记录
     */
    protected static function tracing()
    {
        $transferData = self::$transferData;
        $insert = [
            'day'       => $transferData['day'],
            'time'      => $transferData['time'],
            'trace'     => $transferData['trace'],
            'project'   => $transferData['project'],
            'type'      => $transferData['type'],
            'ip'        => $transferData['ip'],
            'transfer'  => $transferData['transfer'],
            'cost_time' => $transferData['costTime'],
            'success'   => $transferData['success'],
            'code'      => $transferData['code'],
            'details'   => $transferData['details'],
        ];
        Db::table('tracing')->insert($insert);
    }

    /**
     * 总统计
     */
    public static function statistic()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        try {
            $data = [
                'day'           => $day,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic')->where('day', $day)
                ->update($data);
        }
    }

    /**
     * 应用总统计
     */
    public static function statisticProject()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        try {
            $data = [
                'day'           => $day,
                'project'       => $project,
                'type'          => $type,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['project', '=', $project],
                ['type', '=', $type],
            ];
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用每分钟统计
     */
    public static function statisticProjectInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        try {
            $data = [
                'day'           => $day,
                'time'          => $minute,
                'project'       => $project,
                'type'          => $type,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['project', '=', $project],
                ['type', '=', $type],
            ];
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_interval')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用调用统计
     */
    public static function statisticProjectTransfer()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        try {
            $data = [
                'day'           => $day,
                'project'       => $project,
                'type'          => $type,
                'transfer'      => $transfer,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_transfer')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['project', '=', $project],
                ['type', '=', $type],
                ['transfer', '=', $transfer]
            ];
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_transfer')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用调用每分钟统计
     */
    public static function statisticProjectTransferInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        try {
            $data = [
                'day'           => $day,
                'time'          => $minute,
                'project'       => $project,
                'type'          => $type,
                'transfer'      => $transfer,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_transfer_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['project', '=', $project],
                ['type', '=', $type],
                ['transfer', '=', $transfer]
            ];
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_ip_transfer_interval')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用状态码统计
     */
    public static function statisticProjectCode()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $code = $transferData['code'];
        try {
            $data = [
                'day'     => $day,
                'project' => $project,
                'type'    => $type,
                'code'    => $code,
                'count'   => 1,
                'cost'    => $costTime,
            ];
            Db::table('statistic_project_code')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['code', '=', $code],
                ['project', '=', $project],
                ['type', '=', $type]
            ];
            Db::table('statistic_project_code')
                ->where($where)
                ->update([
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ]);
        }
    }

    /**
     * 应用状态码每分钟统计
     */
    public static function statisticProjectCodeInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $code = $transferData['code'];
        try {
            $data = [
                'day'     => $day,
                'time'    => $minute,
                'project' => $project,
                'type'    => $type,
                'code'    => $code,
                'count'   => 1,
                'cost'    => $costTime,
            ];
            Db::table('statistic_project_code_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['code', '=', $code],
                ['project', '=', $project],
                ['type', '=', $type]
            ];
            Db::table('statistic_project_code_interval')
                ->where($where)
                ->update([
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ]);
        }
    }

    /**
     * 应用IP统计
     */
    public static function statisticProjectIp()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $success = 1 === $transferData['success'];
        $ip = $transferData['ip'];
        try {
            $data = [
                'day'           => $day,
                'project'       => $project,
                'type'          => $type,
                'ip'            => $ip,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_ip')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip],
            ];

            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_ip')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用IP每分钟统计
     */
    public static function statisticProjectIpInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $success = 1 === $transferData['success'];
        $ip = $transferData['ip'];

        try {
            $data = [
                'day'           => $day,
                'time'          => $minute,
                'project'       => $project,
                'type'          => $type,
                'ip'            => $ip,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_ip_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip],
            ];

            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_ip_interval')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用IP状态码统计
     */
    public static function statisticProjectIpCode()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $code = $transferData['code'];
        try {
            $data = [
                'day'     => $day,
                'project' => $project,
                'type'    => $type,
                'ip'      => $ip,
                'code'    => $code,
                'count'   => 1,
                'cost'    => $costTime,
            ];
            Db::table('statistic_project_ip_code')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['code', '=', $code],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip]
            ];
            Db::table('statistic_project_ip_code')
                ->where($where)
                ->update([
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ]);
        }
    }

    /**
     * 应用IP状态码每分钟统计
     */
    public static function statisticProjectIpCodeInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $code = $transferData['code'];

        try {
            $data = [
                'day'     => $day,
                'time'    => $minute,
                'project' => $project,
                'type'    => $type,
                'ip'      => $ip,
                'code'    => $code,
                'count'   => 1,
                'cost'    => $costTime,
            ];
            Db::table('statistic_project_ip_code_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip],
                ['code', '=', $code],
            ];
            Db::table('statistic_project_ip_code_interval')
                ->where($where)
                ->update([
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ]);
        }
    }

    /**
     * 应用IP调用统计
     */
    public static function statisticProjectIpTransfer()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];

        try {
            $data = [
                'day'           => $day,
                'project'       => $project,
                'type'          => $type,
                'ip'            => $ip,
                'transfer'      => $transfer,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_ip_transfer')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip],
                ['transfer', '=', $transfer]
            ];
            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_ip_transfer')
                ->where($where)
                ->update($data);
        }
    }

    /**
     * 应用IP调用每分钟统计
     */
    public static function statisticProjectIpTransferInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];

        try {
            $data = [
                'day'           => $day,
                'time'          => $minute,
                'project'       => $project,
                'type'          => $type,
                'ip'            => $ip,
                'transfer'      => $transfer,
                'count'         => 1,
                'cost'          => $costTime,
                'success_count' => $success ? 1 : 0,
                'error_count'   => $success ? 0 : 1,
            ];
            Db::table('statistic_project_ip_transfer_interval')->insert($data);
        } catch (\PDOException | \Exception $e) {
            $where = [
                ['day', '=', $day],
                ['time', '=', $minute],
                ['project', '=', $project],
                ['type', '=', $type],
                ['ip', '=', $ip],
                ['transfer', '=', $transfer]
            ];

            $data = [
                'count'         => Db::raw('count + 1'),
                'cost'          => Db::raw("cost + {$costTime}"),
            ];
            if($success){
                $data['success_count'] = Db::raw('success_count + 1');
            }else{
                $data['error_count'] = Db::raw('error_count + 1');
            }
            Db::table('statistic_project_ip_transfer_interval')
                ->where($where)
                ->update($data);
        }
    }
}

有个statistics的统计系统,我抄的这个,但是他的貌似是定时器加redis实现的,我想用队列的多进程

  • 864328615 2023-08-10

    现在是按第一个大佬加表唯一索引写的代码,但是多进程会出现mysql死锁问题,单进程完全没问题

  • efnic 2023-08-10

    你这种写法数据库的压力有点大;

    1. 消费者处理队列消息时,不要单条插入数据库(单条插入的效率很低,最好是批量插入);
    2. 把更新数据的操作,改为redis自增;
    3. 定时+批量保存进数据
  • 864328615 2023-08-10

    我简单压测了下,执行时间大部分在0.5s以下,个别在1秒多,电脑4h8g的,确实很耗时,但是这程序只要能正常运行都没问题,前期都不考虑数据库压力了,我担心多进程如果加入了redis锁 会不会更加耗时的问题

efnic
/**
 * 统计锁
 * @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 数据库操作锁
 * @method static LockInterface project(string $project, ?float $ttl = null, ?bool $autoRelease = null) 应用锁
 * @method static LockInterface statistic(string $day, ?float $ttl = null, ?bool $autoRelease = null) 总统计锁
 * @method static LockInterface statisticProject(string $day_project_type, ?float $ttl = null, ?bool $autoRelease = null) 应用总统计
 * @method static LockInterface statisticProjectInterval(string $day_minute_project_type, ?float $ttl = null, ?bool $autoRelease = null) 应用每分钟统计
 * @method static LockInterface statisticProjectTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用调用统计
 * @method static LockInterface statisticProjectTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用调用每分钟统计
 * @method static LockInterface statisticProjectCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用状态码统计
 * @method static LockInterface statisticProjectCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用状态码每分钟统计
 * @method static LockInterface statisticProjectIp(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP统计
 * @method static LockInterface statisticProjectIpInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP每分钟统计
 * @method static LockInterface statisticProjectIpCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP状态码统计
 * @method static LockInterface statisticProjectIpCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP状态码每分钟统计
 * @method static LockInterface statisticProjectIpTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP调用统计
 * @method static LockInterface statisticProjectIpTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP调用每分钟统计
 */
class StatisticLocker extends Locker
{

}
  • efnic 2023-08-10

    原则就是:直接插入的不需要锁;插入或者更新的才需要锁。
    按照不同的约束类型,自己拼接锁的key

  • efnic 2023-08-10

    最好的方法是先内存计算,再批量写数据库。

  • 864328615 2023-08-10

    明白了 大佬 我尝试队列里不在进行数据库操作,只计算业务结果后保留到redis里,然后开启定时器扫描redis批量入库

  • 864328615 2023-08-10

    大佬那个高性能的代码为啥删了啊,我学习下啊,就是那个不读直接更新那里我没看明白,不读的话我怎么确定是更新还是插入,那个goto设计的真好,我这种低端码农都没用过goto

efnic
// 提高性能
if (\support\Redis::exists($key)) {
    goto update_db;
}
//取锁:有效期3秒,自释放
$lock = \app\services\lock\StatisticLocker::lock($day, 3, true);
//多进程阻塞等待
if ($lock->acquire(true)) {
    $row = Db::table('statistic')->where('day', $day)->first();
    if ($row) {
        update_db:
        //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
        //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
        Db::table('statistic')->where($where)->update($data);
    } else {
        $data = [
            'day'           => $day,
            'count'         => 1,
            'cost'          => $costTime,
            'success_count' => $success ? 1 : 0,
            'error_count'   => $success ? 0 : 1,
        ];
        Db::table('statistic')->insert($data);
        //设置标志
        \support\Redis::setEx($key, 300, time());
    }
}
//代码执行到这里,会立刻自动释放锁
  • efnic 2023-08-10

    执行流程:
    1、标志位的key,自己根据业务组装;
    2、不存在标志位时,多进程阻塞等待 获取锁;
    3、插入成功后,设置标志位;
    4、队列再有数据进来,存在标志位时,多进程同步更新;

  • 864328615 2023-08-10

    谢谢大佬们,受益了

  • ak47f16200 2023-08-10

    这种多进程处理队列只能有一个干活,其他的都会处于等待中吧

  • efnic 2023-08-10

    创建,只能有一个;更新是并行的。
    看写的人吧。

864328615
<?php

namespace app\queue\redis;

use app\services\lock\StatisticLocker;
use support\Db;
use support\Redis;
use Tinywan\ExceptionHandler\Exception\ServerErrorHttpException;
use Webman\RedisQueue\Consumer;

class SyncStatisticsToMysql implements Consumer
{
    // 要消费的队列名
    public string $queue = 'sync-statistics-to-mysql';

    // 连接名,对应 plugin/webman/redis-queue/redis.php 里的连接`
    public string $connection = 'default';

    // 消费的数据
    public static array $transferData = [];

    public static $num = 0;

    // 消费
    public function consume($data)
    {
        $startTime = microtime(true);
        self::$transferData = $data;
        try {
            // 组装数据
            self::assembleData();
            // 调用记录
            self::tracing();
            // 应用
            self::project();
            // 总统计
            self::statistic();
            // 应用总统计
            self::statisticProject();
            // 应用每分钟统计
            self::statisticProjectInterval();
            // 应用调用统计
            self::statisticProjectTransfer();
            // 应用调用每分钟统计
            self::statisticProjectTransferInterval();
            // 应用状态码统计
            self::statisticProjectCode();
            // 应用状态码每分钟统计
            self::statisticProjectCodeInterval();
            // 应用IP统计
            self::statisticProjectIp();
            // 应用IP每分钟统计
            self::statisticProjectIpInterval();
            // 应用IP状态码统计
            self::statisticProjectIpCode();
            // 应用IP状态码每分钟统计
            self::statisticProjectIpCodeInterval();
            // 应用IP调用统计
            self::statisticProjectIpTransfer();
            //应用IP调用每分钟统计
            self::statisticProjectIpTransferInterval();
        } catch (\Throwable $th) {
            var_dump($th->getMessage());
            throw new ServerErrorHttpException($th->getMessage());
        }
        $endTime = microtime(true);
        self::$num += $endTime - $startTime;
        var_dump(self::$num);
    }

    /**
     * 组装数据
     */
    protected static function assembleData()
    {
        $transferData = self::$transferData;
        $names = explode('-', $transferData['project']);
        $transferData['project'] = $names[0];
        $transferData['type'] = $names[1] ?? 'All';
        $timer = strtotime($transferData['time']);
        $transferData['day'] = date('Ymd', $timer);
        $transferData['minute'] = date('YmdHi', $timer);
        $transferData['trace'] = uniqid();
        $transferData['project_key'] = md5($transferData['project']);
        $transferData['statistic_key'] = md5($transferData['day']);
        $transferData['statistic_project_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type']);
        $transferData['statistic_project_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['minute']);
        $transferData['statistic_project_transfer_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['transfer']);
        $transferData['statistic_project_transfer_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['transfer'] . $transferData['minute']);
        $transferData['statistic_project_code_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['code']);
        $transferData['statistic_project_code_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['code'] . $transferData['minute']);
        $transferData['statistic_project_ip_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip']);
        $transferData['statistic_project_ip_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['minute']);
        $transferData['statistic_project_ip_code_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['code']);
        $transferData['statistic_project_ip_code_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['code'] . $transferData['minute']);
        $transferData['statistic_project_ip_transfer_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['transfer']);
        $transferData['statistic_project_ip_transfer_interval_key'] = md5($transferData['day'] . $transferData['project'] . $transferData['type'] . $transferData['ip'] . $transferData['transfer'] . $transferData['minute']);
        self::$transferData = $transferData;
    }

    /**
     * 调用记录
     */
    protected static function tracing()
    {
        $transferData = self::$transferData;
        $insert = [
            'day'       => $transferData['day'],
            'time'      => $transferData['time'],
            'trace'     => $transferData['trace'],
            'project'   => $transferData['project'],
            'type'      => $transferData['type'],
            'ip'        => $transferData['ip'],
            'transfer'  => $transferData['transfer'],
            'cost_time' => $transferData['costTime'],
            'success'   => $transferData['success'],
            'code'      => $transferData['code'],
            'details'   => $transferData['details'],
        ];
        Db::table('tracing')->insert($insert);
    }

    /**
     * 应用
     */
    protected static function project()
    {
        $transferData = self::$transferData;
        $project = $transferData['project'];
        $time = date('Y-m-d H:i:s');
        $key = $transferData['project_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('project')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                Db::table('project')->where('key', $key)->update([
                    'updated_at' => $time
                ]);
            } else {
                $data = [
                    'key'        => $key,
                    'project'    => $project,
                    'created_at' => $time,
                    'updated_at' => $time
                ];
                Db::table('project')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
        //代码执行到这里,会立刻自动释放锁
    }

    /**
     * 总统计
     */
    public static function statistic()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        $key = $transferData['statistic_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic')->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用总统计
     */
    public static function statisticProject()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        $key = $transferData['statistic_project_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'project'       => $project,
                    'type'          => $type,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用每分钟统计
     */
    public static function statisticProjectInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $success = 1 === $transferData['success'];
        $costTime = $transferData['costTime'];
        $key = $transferData['statistic_project_interval_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_interval')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'time'          => $minute,
                    'project'       => $project,
                    'type'          => $type,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用调用统计
     */
    public static function statisticProjectTransfer()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        $key = $transferData['statistic_project_transfer_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_transfer')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_transfer')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'project'       => $project,
                    'type'          => $type,
                    'transfer'      => $transfer,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_transfer')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用调用每分钟统计
     */
    public static function statisticProjectTransferInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        $key = $transferData['statistic_project_transfer_interval_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_transfer_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_transfer_interval')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'time'          => $minute,
                    'project'       => $project,
                    'type'          => $type,
                    'transfer'      => $transfer,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_transfer_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用状态码统计
     */
    public static function statisticProjectCode()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $code = $transferData['code'];
        $key = $transferData['statistic_project_code_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_code')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                Db::table('statistic_project_code')
                    ->where('key', $key)
                    ->update([
                        'count' => Db::raw('count + 1'),
                        'cost'  => Db::raw("cost + {$costTime}"),
                    ]);
            } else {
                $data = [
                    'key'     => $key,
                    'day'     => $day,
                    'project' => $project,
                    'type'    => $type,
                    'code'    => $code,
                    'count'   => 1,
                    'cost'    => $costTime,
                ];
                Db::table('statistic_project_code')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用状态码每分钟统计
     */
    public static function statisticProjectCodeInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $code = $transferData['code'];
        $key = $transferData['statistic_project_code_interval_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_code_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                Db::table('statistic_project_code_interval')
                    ->where('key', $key)
                    ->update([
                        'count' => Db::raw('count + 1'),
                        'cost'  => Db::raw("cost + {$costTime}"),
                    ]);
            } else {
                $data = [
                    'key'     => $key,
                    'day'     => $day,
                    'time'    => $minute,
                    'project' => $project,
                    'type'    => $type,
                    'code'    => $code,
                    'count'   => 1,
                    'cost'    => $costTime,
                ];
                Db::table('statistic_project_code_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP统计
     */
    public static function statisticProjectIp()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $success = 1 === $transferData['success'];
        $ip = $transferData['ip'];
        $key = $transferData['statistic_project_ip_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_ip')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'project'       => $project,
                    'type'          => $type,
                    'ip'            => $ip,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_ip')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP每分钟统计
     */
    public static function statisticProjectIpInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $success = 1 === $transferData['success'];
        $ip = $transferData['ip'];
        $key = $transferData['statistic_project_ip_interval_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_ip_interval')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'time'          => $minute,
                    'project'       => $project,
                    'type'          => $type,
                    'ip'            => $ip,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_ip_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP状态码统计
     */
    public static function statisticProjectIpCode()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $code = $transferData['code'];
        $key = $transferData['statistic_project_ip_code_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip_code')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法

                Db::table('statistic_project_ip_code')
                    ->where('key', $key)
                    ->update([
                        'count' => Db::raw('count + 1'),
                        'cost'  => Db::raw("cost + {$costTime}"),
                    ]);
            } else {
                $data = [
                    'key'     => $key,
                    'day'     => $day,
                    'project' => $project,
                    'type'    => $type,
                    'ip'      => $ip,
                    'code'    => $code,
                    'count'   => 1,
                    'cost'    => $costTime,
                ];
                Db::table('statistic_project_ip_code')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP状态码每分钟统计
     */
    public static function statisticProjectIpCodeInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $code = $transferData['code'];
        $key = $transferData['statistic_project_ip_code_interval_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip_code_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                Db::table('statistic_project_ip_code_interval')
                    ->where('key', $key)
                    ->update([
                        'count' => Db::raw('count + 1'),
                        'cost'  => Db::raw("cost + {$costTime}"),
                    ]);
            } else {
                $data = [
                    'key'     => $key,
                    'day'     => $day,
                    'time'    => $minute,
                    'project' => $project,
                    'type'    => $type,
                    'ip'      => $ip,
                    'code'    => $code,
                    'count'   => 1,
                    'cost'    => $costTime,
                ];
                Db::table('statistic_project_ip_code_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP调用统计
     */
    public static function statisticProjectIpTransfer()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        $key = $transferData['statistic_project_ip_transfer_key'];
        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip_transfer')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法

                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_ip_transfer')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'project'       => $project,
                    'type'          => $type,
                    'ip'            => $ip,
                    'transfer'      => $transfer,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_ip_transfer')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }

    /**
     * 应用IP调用每分钟统计
     */
    public static function statisticProjectIpTransferInterval()
    {
        $transferData = self::$transferData;
        $day = $transferData['day'];
        $minute = $transferData['minute'];
        $project = $transferData['project'];
        $type = $transferData['type'];
        $costTime = $transferData['costTime'];
        $ip = $transferData['ip'];
        $transfer = $transferData['transfer'];
        $success = 1 === $transferData['success'];
        $key = $transferData['statistic_project_ip_transfer_interval_key'];

        if (Redis::exists('key:' . $key)) {
            goto update_db;
        }
        //取锁:有效期3秒,自释放
        $lock = StatisticLocker::lock($key, 3, true);

        //多进程阻塞等待
        if ($lock->acquire(true)) {
            $row = Db::table('statistic_project_ip_transfer_interval')->where('key', $key)->first();
            if ($row) {
                update_db:
                //这里不要用查询出的递增字段的值,因为多进程下存在并发,数据会不准!!
                //最佳实践,根据查询出的数据,拼接$where,而$data使用数据库原生语法
                $data = [
                    'count' => Db::raw('count + 1'),
                    'cost'  => Db::raw("cost + {$costTime}"),
                ];
                if ($success) {
                    $data['success_count'] = Db::raw('success_count + 1');
                } else {
                    $data['error_count'] = Db::raw('error_count + 1');
                }
                Db::table('statistic_project_ip_transfer_interval')
                    ->where('key', $key)
                    ->update($data);
            } else {
                $data = [
                    'key'           => $key,
                    'day'           => $day,
                    'time'          => $minute,
                    'project'       => $project,
                    'type'          => $type,
                    'ip'            => $ip,
                    'transfer'      => $transfer,
                    'count'         => 1,
                    'cost'          => $costTime,
                    'success_count' => $success ? 1 : 0,
                    'error_count'   => $success ? 0 : 1,
                ];
                Db::table('statistic_project_ip_transfer_interval')->insert($data);
                //设置标志
                Redis::setEx('key:' . $key, 300, time());
            }
        }
    }
}

上述代码经过测试,300条数据,单进程耗时20.487082958221436s,cpu数进程耗时21.118263483047485s,cpu*4进程数耗时22.74993324279785s,对进程处理并没有加快反而由于锁的存在多耗时了;大佬帮看看我上面的代码,我准备放弃了,改用redis+定时器扫描批量入库了

  • 864328615 2023-08-10

    这个结果不稳定,总之很慢,在考虑用定时器执行一次从redis队列取出100条,处理批量入库

  • efnic 2023-08-11

    盲猜:10张表,你都用StatisticLocker::lock($key, 3, true);加锁,搞成共享锁了。错啦,兄弟

  • 小W 2023-08-11

    他代码里protected static function assembleData()组装了每个表的key

  • efnic 2023-08-11

    每张表的key,加索引了吗?没加索引的话,巨慢。

  • 小W 2023-08-11

    当事人不出来澄清,不好说,也有可能每秒更新很多次,有key也承受不了

  • efnic 2023-08-11

    确实,最优解还是合并计算、批量插入数据库效率最高。

  • efnic 2023-08-11

    我写过一个业务,与交易所建立长连接,交易所长连接下发数据很频繁,一条条保存4个进程都积压。搞成批量插入,轻松无压力。

  • 864328615 2023-08-11

    表里测试完 数据会清空,所以目前还没加索引,想着量不大,原来检索数据需要好多条件,我加个key,使用一个了

  • 小W 2023-08-11

    该来该去,不如单进程效率高

小W

//取锁:有效期3秒,自释放
$lock = StatisticLocker::lock($key, 3, true);

数据库频繁的插入更新可能导致每次锁的释放时间增加;

根据业务逻辑,insert只有一次,update可能存在很多次,这一块是造成数据库压力的原因,所以建议把update部分的内容放到redis队列,其他逻辑保持不变。

  • 暂无评论
efnic

那个锁插件的基类代码:

class Locker
{
    public static function __callStatic($name, $arguments)
    {
        $key = $arguments[0] ?? '';
        unset($arguments[0]);
        return static::createLock($name . $key, ...$arguments);
    }

而你用的都是这一个方法,key的取值是否有重复,不得而知。

//取锁:有效期3秒,自释放
$lock = StatisticLocker::lock($key, 3, true);

每一个表的操作是不能共享锁的。也要根据业务的特性,ttl值也是有讲究的。
比如每天、每分钟、每小时才创建一次的,要根据时间合理调整ttl值。
总体原则是:创建时候才加锁,多进程同步更新。
我记得我给你写了很多个方法的啊?为的就是避免锁重复

  • efnic 2023-08-11

    最优解还是内存计算,批量保存进数据库。

  • 864328615 2023-08-11

    当时,很多方法的时候出现了重复入库的问题,我没研究那个锁的原理 想着是不是方法不一造成的 就改成一个了

  • 864328615 2023-08-11

    算了 我这代码太垃圾了 不值得再探讨了,简单测试数据承载量太低了,这段代码只是实现了业务上的逻辑,实际中根本无法承载业务需求

  • efnic 2023-08-11

    重复入库是因为数据库压力大,3秒后锁自动释放了。因为是自释放锁,所以可适当延长锁时间比如30秒。

  • efnic 2023-08-11

    各个数据表,对key做索引就行了。

  • efnic 2023-08-11

    10张表,就需要10个锁。

  • 864328615 2023-08-11

    我在测试下 大佬 感谢

  • 864328615 2023-08-11

    测试300数据处理 8进程 时间float(20.861277103424072)根之前差不多 这个处理速度太慢了,而且造成数据库压力很大,目前还是没数据的状态,当数据库数据量大的时候,这个处理时间会根漫长,还有key字段加了索引

  • efnic 2023-08-11

    还是一步到位,改为批量插入吧;

efnic

每一个方法名,表示一个锁的前缀,前缀也是锁key的一部分(可看源码);
意思就是前缀不同、key值相同,也是不同的锁。

/**
 * 统计锁
 * @method static LockInterface lock(string $day, ?float $ttl = null, ?bool $autoRelease = null) 数据库操作锁
 * @method static LockInterface project(string $project, ?float $ttl = null, ?bool $autoRelease = null) 应用锁
 * @method static LockInterface statistic(string $day, ?float $ttl = null, ?bool $autoRelease = null) 总统计锁
 * @method static LockInterface statisticProject(string $day_project_type, ?float $ttl = null, ?bool $autoRelease = null) 应用总统计
 * @method static LockInterface statisticProjectInterval(string $day_minute_project_type, ?float $ttl = null, ?bool $autoRelease = null) 应用每分钟统计
 * @method static LockInterface statisticProjectTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用调用统计
 * @method static LockInterface statisticProjectTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用调用每分钟统计
 * @method static LockInterface statisticProjectCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用状态码统计
 * @method static LockInterface statisticProjectCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用状态码每分钟统计
 * @method static LockInterface statisticProjectIp(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP统计
 * @method static LockInterface statisticProjectIpInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP每分钟统计
 * @method static LockInterface statisticProjectIpCode(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP状态码统计
 * @method static LockInterface statisticProjectIpCodeInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP状态码每分钟统计
 * @method static LockInterface statisticProjectIpTransfer(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP调用统计
 * @method static LockInterface statisticProjectIpTransferInterval(string $key, ?float $ttl = null, ?bool $autoRelease = null) 应用IP调用每分钟统计
 */
class StatisticLocker extends Locker
{

}
  • 暂无评论
软饭工程师

我们之前访问量不大,直接是使用事务,保证数据的一致性,我问了ai,它让我使用行锁

use Illuminate\Support\Facades\DB;

// 开启事务
DB::beginTransaction();

try {
    // 使用行级锁查询数据
    $row = DB::table('statistic')->where('day', $day)->lockForUpdate()->first();

    if ($row) {
        $data = [
            'count'         => $row->count + 1,
            'cost'          => $row->cost + $costTime,
            'success_count' => $row->success_count + ($success ? 1 : 0),
            'error_count'   => $row->error_count + ($success ? 0 : 1),
        ];
        DB::table('statistic')->where('day', $day)->update($data);
    } else {
        $data = [
            'day'           => $day,
            'count'         => 1,
            'cost'          => $costTime,
            'success_count' => $success ? 1 : 0,
            'error_count'   => $success ? 0 : 1,
        ];
        DB::table('statistic')->insert($data);
    }

    // 提交事务
    DB::commit();
} catch (\Exception $e) {
    // 回滚事务
    DB::rollback();
    // 处理异常
}
  • 864328615 2023-08-11

    那个异常用Throwable,感觉你这么写很容易出现死锁的问题,还有我记得事务本身就会加锁啊,你代码里又加了锁

  • 864328615 2023-08-11

    原来是我写的==

  • 软饭工程师 2023-08-11

    mysql 开启事务查询和插入不会加锁,但是更新和删除确实会加锁,where 条件如果有索引会锁行,如果没有索引会锁表

  • 864328615 2023-08-11

    受教了

a784910468

队列消费不是应该消出库就没了,不存在二次消费吧

  • 864328615 2023-08-11

    不存在,只是我队列里数据有的是重复的 设计更新问题,只是插入的话用队列完全没问题,但是正如上面大佬说的,最高内存计算 批量入库减少数据库压力,也就是仅仅入库的话使用队列并不是完美,队列还是根一些发邮件啊 之类的业务更匹配

meows

你这种情况是多进程并发所致,你的SQL语句并非原子操作。
想要解决这个问题,你只能使用MySQL 自带SQL语句来实现原子操作。
这篇文章能解决这些问题,https://www.ngui.cc/el/1890018.html?action=onClick

建议采用这条SQL写法:ON DUPLICATE KEY UPDATE

  • 暂无评论
年代过于久远,无法发表回答
×
🔝