python处理日志数据

需求是这样的,我们需要将日志记录里面关于日活与新增做个统计,每天一次统计记录:

源数据是从请求接口放到队列当中,然后再从队列取出,实现异步插入,以下是python把数据从日志表查询计算之后得到的数据再插入到目标表,

原始日志数据:

409351146956247408    2014-08-30 06:25:46    /money/apple/product/list/IPHONE/api_2    {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["87f29b03bb8addddddd0d458da2ff"]}    7366056    49.66.48.198    3    0
409351147959838928    2014-08-30 06:25:46    /money/apple/product/list/IPHONE/api_2    {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["2dddddddd0000820d458da2ff"]}    4973658    117.136.19.177    5    0
409351147960207591    2014-08-30 06:25:47    /money/apple/product/list/IPHONE/api_2    {"api_level":["2"],"app_version":["1.9.6"],"platform":["1"],"api_version":["api_2"],"_key":["95e6d2efb31dddddddddd20d458da2ff"]}    3897136    123.151.136.53    3    0

表结构是这样:

目标表结构:

处理程序如下:

# -*- coding: utf-8 -*-
from datetime import *
import MySQLdb
import sys
import time
import datetime
import json
import logging

logging.basicConfig(level=logging.INFO,
                format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,
                datefmt=%a, %d %b %Y %H:%M:%S,
                filename=tb_app_profile2.log,
                filemode=w)

console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter(%(name)-12s: %(levelname)-8s %(message)s)
console.setFormatter(formatter)
logging.getLogger(‘‘).addHandler(console)


reload(sys)
sys.setdefaultencoding(utf-8)

s_connection = MySQLdb.connect(host="127.0.0.1",port=4277,user="xxx",passwd="xxx$meu78",db="db_access_log");
s_connection.set_character_set(utf8)
s_cursor = s_connection.cursor();
s_cursor.execute(SET NAMES utf8;)
s_cursor.execute(SET CHARACTER SET utf8;)
s_cursor.execute(SET character_set_connection=utf8;)

def getDataS(sql):
    s_cursor.execute(sql)
    return s_cursor.fetchall()

t_connection = MySQLdb.connect(host="127.0.0.1",port=3361,user="xxx",passwd="xxxxx",db="db_statistics");
t_connection.set_character_set(utf8)
t_cursor = t_connection.cursor();
t_cursor.execute(SET NAMES utf8;)
t_cursor.execute(SET CHARACTER SET utf8;)
t_cursor.execute(SET character_set_connection=utf8;)

def getDataT(sql):
    t_cursor.execute(sql)
    return t_cursor.fetchall()
    
def setModelT(sql):
    t_cursor.execute(sql)

def process(dtstr):
    s_table_pre = "tb_access_log_"
    #用户新增
    user_add_all_num = 0;
    user_add_android_num = 0;
    user_add_ios_num = 0;
    user_add_female_num = 0;
    user_add_male_num = 0;
    user_add_alldev_num = 0;
    user_add_alldev_set = set([]);
    #用户活跃
    user_active_all_num = 0;
    user_active_android_num = 0;
    user_active_ios_num = 0;
    user_active_ios_num = 0;
    user_active_all_set = set([]);
    user_active_android_set = set([]);
    user_active_ios_set = set([]);
    
    maxId = 0;
    flag = 0;
    while 1:
        logging.debug("maxId:%d" % maxId)
        logging.debug("flag:%d" % flag)
        flag = flag + 1;
        sql = "SELECT ID,AccessTime,Uri,Params,UserID,RealIp,TimeSpent,Code from %s%s where id > %d order by id limit 10000" %(s_table_pre,dtstr,maxId);
        result = getDataS(sql);
        result_len = len(result);
        if result_len == 0:
            break
        elif result_len > 0:
            if flag > 3:
                break
                pass
            maxId = result[result_len-1][0]
            for i in result:
                user_active_all_set.add(i[4])
                try:
                    platform = int(json.loads(i[3])["platform"][0])
                    logging.debug("platform:%d" % platform)
                    if platform == 1:
                        user_active_android_set.add(i[4])
                    if platform == 2:
                        user_active_ios_set.add(i[4])
                except Exception,e :
                    logging.error("Exception:%s" % e)

                if i[2] == "/account/register" and i[7] == 0:
                    try:
                        user_add_alldev_set.add(json.loads(i[3])["device_code"][0])
                    except Exception,e :
                        logging.error("Exception:%s" % e)

                    user_add_all_num = user_add_all_num + 1;
                    
                    try:
                        platform = int(json.loads(i[3])["platform"][0])
                        logging.debug("platform:%d" % platform)
                        if platform == 1:
                            user_add_android_num = user_add_android_num + 1;
                        elif platform == 2:
                            user_add_ios_num = user_add_ios_num + 1;
                    except Exception,e :
                        logging.error("Exception:%s" % e)
                    
                    try:
                        gender = int(json.loads(i[3])["gender"][0])
                        logging.debug("gender:%d" % gender)
                        if gender == 1:
                            user_add_female_num = user_add_female_num + 1;
                        elif gender == 2:
                            user_add_male_num = user_add_male_num + 1;
                    except Exception,e :
                        logging.error("Exception:%s" % e)
        logging.info("user_active_all_num:%d" % len(user_active_all_set) )
        logging.info("user_active_android_num:%d" % len(user_active_android_set) )
        logging.info("user_active_ios_num:%d" % len(user_active_ios_set) )
        logging.info("user_add_alldev_num:%d" % len(user_add_alldev_set) )
        logging.info("user_add_all_num:%d" % user_add_all_num)
        logging.info("user_add_android_num:%d" % user_add_android_num)
        logging.info("user_add_ios_num:%d" % user_add_ios_num)
        logging.info("user_add_female_num:%d" % user_add_female_num)
        logging.info("user_add_male_num:%d" % user_add_male_num)

                    
    user_active_all_num = len(user_active_all_set)
    user_active_android_num = len(user_active_android_set)
    user_active_ios_num = len(user_active_ios_set)
    user_add_alldev_num = len(user_add_alldev_set)

    logging.debug("user_active_all_num:%d" % user_active_all_num)
    logging.debug("user_active_android_num:%d" % user_active_android_num)
    logging.debug("user_active_ios_num:%d" % user_active_ios_num)
    logging.debug("user_add_alldev_num:%d" % user_add_alldev_num)
    logging.debug("user_add_all_num:%d" % user_add_all_num)
    logging.debug("user_add_android_num:%d" % user_add_android_num)
    logging.debug("user_add_ios_num:%d" % user_add_ios_num)
    logging.debug("user_add_female_num:%d" % user_add_female_num)
    logging.debug("user_add_male_num:%d" % user_add_male_num)

    #查询目标表
    t_table = "tb_app_profile"
    t_sql = "select ID,TDate,AppKey,AppValue from %s where TDate=‘%s‘" % (t_table,dtstr);
    t_result = getDataT(t_sql);
    t_len = len(t_result);
    if t_len == 0:
        logging.debug("insert")
        sql = (
                "insert into %s (TDate,AppKey,AppValue) values "
                "(‘%s‘,‘user_active_all‘,‘%d‘),"
                "(‘%s‘,‘user_active_android‘,‘%d‘),"
                "(‘%s‘,‘user_active_ios‘,‘%d‘),"
                "(‘%s‘,‘user_add_alldev‘,‘%d‘),"
                "(‘%s‘,‘user_add_all‘,‘%d‘),"
                "(‘%s‘,‘user_add_android‘,‘%d‘),"
                "(‘%s‘,‘user_add_ios‘,‘%d‘),"
                "(‘%s‘,‘user_add_female‘,‘%d‘),"
                "(‘%s‘,‘user_add_male‘,‘%d‘);"
                % (
                    t_table
                    ,dtstr,user_active_all_num
                    ,dtstr,user_active_android_num
                    ,dtstr,user_active_ios_num
                    ,dtstr,user_add_alldev_num
                    ,dtstr,user_add_all_num
                    ,dtstr,user_add_android_num
                    ,dtstr,user_add_ios_num
                    ,dtstr,user_add_female_num
                    ,dtstr,user_add_male_num
                   )
               );
        setModelT(sql)
    elif t_len > 0:
        logging.debug("update")
        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_all‘;" % (t_table,user_active_all_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_android‘;" % (t_table,user_active_android_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_active_ios‘;" % (t_table,user_active_ios_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_alldev‘;" % (t_table,user_add_alldev_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_all‘;" % (t_table,user_add_all_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_android‘;" % (t_table,user_add_android_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_ios‘;" % (t_table,user_add_ios_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_female‘;" % (t_table,user_add_female_num,dtstr);
        setModelT(sql)

        sql = "update %s set AppValue=%d where TDate=‘%s‘ and AppKey=‘user_add_male‘;" % (t_table,user_add_male_num,dtstr);
        setModelT(sql)

if __name__ == "__main__":
    days = 6
    while days >= 0: 
        tdate = (datetime.datetime.now() - datetime.timedelta(days=days))
        dtstr = tdate.strftime(%Y%m%d)
        process(dtstr)
        days = days - 1
View Code

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。