python

当前位置:首页 > SEO工具 > 当前文章

SEO工具

python百度mo域名首页覆盖率查询

2020-08-24 157赞 python中国网
每篇文章努力于解决一个问题!python高级、python面试全套、操作系统经典课等可移步文章底部。

  判断一个行业哪些站做的好(流量大)的做法

  1、找一批行业词库,抓取每个词百度排名前10的url。假设1万个词那么得到10万个url;

  2、从10万个url中提取域名,统计下各个域名出现次数;

  3、某域名的出现次数/10万 来计算各域名首页覆盖率;

  4、覆盖率高的则为高流量站点。

  【ps:如果长期记录统计,观察发现哪些站降权,哪些站暴涨】。

  百度反爬提示

  会封UA、封cookie、封ip比较少见。线程数默认是1,现在百度反爬比之前严重!线程最好是1。【多线程写同一个文件需要加锁否则可能数据错乱】

  最恶心的是爬虫得来的页面和实际搜索的页面不同(原因如下)!

  因为直接拼接搜索urL进行访问,没有用户真实的鼠标动作无法触发gif请求!所以容易被识别。不加cookie抓取量大的话就会发现虽然是正常的但是页面内容和实际搜索不同,如果不加cookie但是有鼠标的动作则不会被反爬。

  脚本功能(集域名覆盖率+目标域名首页词词数为一体):

  1)指定几个域名,分关键词种类监控目标站点首页词数

  2)采集serp所有url,提取域名并统计各域名首页覆盖率

  脚本规则:

  1)相关网站.相关企业.智能小程序.其他人还在搜.热议聚合.资讯聚合.搜索智能聚合.视频全部算在内

  所以首页排名有可能出现大于10

  2)serp上自然排名mu属性值为排名url,特殊样式mu为空或不存在,

  提取article里url,该url是baidu域名,二次访问才能获得真实url,本脚本直接取baidu链接

  3)2020xiaoqu_kwd_city.xlsx:sheet名为关键词种类,sheet第一列放关键词

  脚本结果:

  bdmo1_index_info.txt:各监控站点词的排名及url,如有2个url排名,只取第一个

  bdmo1_index_all.txt:serp所有url及样式特征,依此统计各域名首页覆盖率-单写脚本(bdmo1_tj.py)完成

  bdmo1_index.xlsx:自己站每类词首页词数

  bdmo1_index_domains.xlsx:各监控站点每类词的首页词数

  bdmo1_index_domains.txt:各监控站点每类词的首页词数

  cookie必须是登录baidu账号后的cookie否则很容易被反爬

# ‐*‐ coding: utf‐8 ‐*‐
"""
功能:
   1)指定几个域名,分关键词种类监控首页词数
   2)采集serp所有url,提取域名并统计各域名首页覆盖率
提示:
  1)相关网站.相关企业.智能小程序.其他人还在搜.热议聚合.资讯聚合.搜索智能聚合.视频全部算在内
    所以首页排名有可能大于10
  2)serp上自然排名mu属性值为排名url,特殊样式mu为空或不存在,
    提取article里url,该url是baidu域名,二次访问才能获得真实url,本脚本直接取baidu链接
  3)kwd_core_city.xlsx:sheet名为关键词种类,sheet第一列放关键词
结果:
    bdmo1_index_info.txt:各监控站点词的排名及url,如有2个url排名,只取第一个
    bdmo1_index_all.txt:serp所有url及样式特征,依此统计各域名首页覆盖率-单写脚本完成
    bdmo1_index.xlsx:自己站每类词首页词数
    bdmo1_index_domains.xlsx:各监控站点每类词的首页词数
    bdmo1_index_domains.txt:各监控站点每类词的首页词数
cookie必须是登录baidu账号后的cookie否则很容易被反爬

"""

import requests
from pyquery import PyQuery as pq
import threading
import queue
import time
from urllib.parse import urlparse
from openpyxl import load_workbook
from openpyxl import Workbook
import time
import gc
import json
import random

# 计算最终结果
def get_result(file_path, result):
    for line in open(file_path, 'r', encoding='utf-8'):
        line = line.strip().split(' ')
        rank = line[2]
        group = line[3]
        domain = line[4]
        if rank != '无':
            result[domain][group]['首页'] += 1
        result[domain][group]['总词数'] += 1
    return result


# 写txt,所有监控域名的结果
def write_domains_txt(result_last):
    with open('{0}bdmo1_index_domains.txt'.format(today), 'w', encoding="utf-8") as f_res:
        f_res.write('{0}    {1} {2} {3} {4}
'.format('日期','域名','词类','首页词数','查询词数'))
        for now_domain,dict_value in result_last.items():
            for group, dict_index_all in dict_value.items():
                f_res.write('{0}    {1} {2} '.format(today,now_domain,group))
                for key, value in dict_index_all.items():
                    f_res.write(str(value) + '  ')
                f_res.write('
')


# 写excel
def write_myexcel(group_list, result_last, today,my_domain):
    wb = Workbook()
    wb_all = Workbook()
    # 创建sheet写表头
    for group in group_list:
        sheet_num = 0
        wb.create_sheet(u'{0}'.format(group), index=sheet_num)
        wb_all.create_sheet(u'{0}'.format(group), index=sheet_num)
        row_first = ['日期', '首页', '总词数']
        row_first2 = ['日期', '域名','首页', '总词数']
        # 写表头
        wb[group].append(row_first)
        wb_all[group].append(row_first2)
        sheet_num += 1
    # 写内容
    for domain, dict_value in result_last.items():
        if domain == my_domain:
            for group, dict_index_all in dict_value.items():
                # 写数据
                row_value = [today]
                for key,value in dict_index_all.items():
                    row_value.append(value)
                wb[u'{0}'.format(group)].append(row_value)

        for group, dict_index_all in dict_value.items():
            # 写数据
            row_value = [today,domain]
            for key, value in dict_index_all.items():
                row_value.append(value)
            wb_all[u'{0}'.format(group)].append(row_value)
    wb.save('{0}bdmo1_index.xlsx'.format(today))
    wb_all.save('{0}bdmo1_index_domains.xlsx'.format(today))

# 发js包-不用
def request_js(url,my_header,retry=1):
    try:
        r = requests.get(url=url,headers=my_header,timeout=2)
    except Exception as e:
        print('获取源码失败',e)
        time.sleep(6)
        if retry > 0:
            request_js(url,retry-1)
    else:
        pass

# 随机header
def get_header():
    my_header = {
        'Accept':'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
'Accept-Encoding':'gzip, deflate, br',
'Accept-Language':'zh-CN,zh;q=0.9,en;q=0.8',
'Cache-Control':'no-cache',
'Connection':'keep-alive',
'Cookie':'wpr=0; BDICON=10123156; ___rl__test__cookies=1582298586348; __cfduid=db1889b7d4272171df5d2b0ed76dbdacc1562148359; BAIDUID=14E9731020ACEE14821E1A67DABB2862:FG=1; MSA_ZOOM=1056; PSTM=1580184319; BIDUPSID=42DF5CED7B3ED990A9FF7BF52F7B4E0B; plus_cv=1::m:49a3f4a6; MSA_PBT=146; MSA_WH=375_667; plus_lsv=f197ee21ffd230fd; BDUSS=Wc0UmtiM2NrUm1JSG9uWGcxV0FBOHQ5cWMzaGVFaTJUMlpzdW5aZjkzanVJbkplRVFBQUFBJCQAAAAAAAAAAAEAAAAGtiZkNzcyNDgzMjAwZG9uZwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAO6VSl7ulUpeQ; BDORZ=B490B5EBF6F3CD402E515D22BCDA1598; H_WISE_SIDS=141694_142059_135847_139560_128700_132920_142210_141000_139420_142018_141837_140201_136863_138585_141650_142511_140989_140114_140325_140578_133847_140065_141808_131423_142101_141707_107314_139882_141883_140368_140798_137703_141102_110085_142271_138596_142345_138450_138878_137985_140173_131246_137749_138165_138883_140259_141941_127969_140622_140593_140864_138426_141009_138944_141190_141929; rsv_i=5877IuabcUI0ot6ToMtrHALjmHp02ro9rxDWvBNCZyF5ZB5f4lAStpaDjbgFFENpNJs3o1uxQqlSFRFFBkh26aIumA7gm7k; ___rl__test__cookies=1582298556002; SE_LAUNCH=5%3A26371642_0%3A26371642; BDICON=10123156; BDPASSGATE=IlPT2AEptyoA_yiU4VKH3kIN8efjWvW4AfvESkplQFStfCaWmhH3BrVMSEnHN-a81iTM-Y3fo_-5kV8PQlxmicwdggYTpW-H7CG-zNSF5aTvL_NgzbIZCb4jQUZqqvzxkgl-zuEHQ49zBCstpBTbpuo4ivKl73JQb4r56kCygKrl_oGm2X8My88bOXVfYZ0APNu594rXnEpKLDm4Yt_tT9PecSIMR7QEy0bgd_stOOr-sjILHe8sZ1FOD78vJpFeXAio1fWU60ul269v5HViViwh9UOI7u46MnJZ; delPer=0; BDORZ=SFH; COOKIE_SESSION=0_0_0_1_0_w1_0_1_0_0_0_0_2_1582298558%7C1%230_0_0_0_0_0_0_0_1582298558%7C1; ASUV=1.2.126; ysm=10303|10303; FC_MODEL=-1_0_17_0_0_0_0_0_0_0_0_-1_7_20_4_25_0_1582298569522_1582298558495%7C9%230_-1_-1_7_4_1582298569522_1582298558495%7C9; BDSVRTM=46; PSINO=1; BDSVRBFE=Go; __bsi=11504946712979853793_00_14_R_R_1_0303_c02f_Y; OUTFOX_SEARCH_USER_ID_NCOO=390990579.0610157',
'Host':'m.baidu.com',
'Pragma':'no-cache',
'Referer':'https://m.baidu.com/ssid=06b6373732343833323030646f6e672664/s?word=aaa&sa=tb&ts=8573018&t_kt=0&ie=utf-8&rsv_t=138c6EhijUR%252FBZby32o%252F7R6O%252F8jrg3gEeZx03hJXFZIjcgP0SX77&rsv_pq=10865724986727548274&ss=&rqlang=zh&oq=aaa',
'Sec-Fetch-Mode':'navigate',
'Sec-Fetch-Site':'same-origin',
'Sec-Fetch-User':'?1',
'Upgrade-Insecure-Requests':'1',
'User-Agent':'Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1',}
    return my_header



class bdmoIndexMonitor(threading.Thread):

    def __init__(self):
        threading.Thread.__init__(self)

    @staticmethod
    def read_excel(filepath):
        q = queue.Queue()
        group_list = []
        kwd_dict = {}
        wb_kwd = load_workbook(filepath)
        for sheet_obj in wb_kwd:
            sheet_name = sheet_obj.title
            group_list.append(sheet_name)
            kwd_dict[sheet_name]= []
            col_a = sheet_obj['A']
            for cell in col_a:
                kwd = (cell.value)
                # 加个判断吧
                if kwd:
                    q.put([sheet_name,kwd])
        return q, group_list

    # 初始化结果字典
    @staticmethod
    def result_init(group_list):
        result = {}
        for domain in domains:
            result[domain] = {}
            for group in group_list:
                result[domain][group] = {'首页':0,'总词数':0}
        print("结果字典init...")
        return result

    # 获取某词serp源码
    def get_html(self,url,my_header,retry=1):
        try:
            r = requests.get(url=url,headers=my_header,timeout=5)
        except Exception as e:
            print('获取源码失败',e)
            time.sleep(6)
            if retry > 0:
                self.get_html(url,my_header,retry-1)
        else:
            html = r.content.decode('utf-8',errors='ignore')  # 用r.text有时候识别错误
            url = r.url  # 反爬会重定向,取定向后的地址
            return html,url

    # 获取某词的serp源码上包含排名url的div块
    def get_divs(self, html ,url):
        div_list = []
        doc = pq(html)
        title = doc('title').text()
        if '- 百度' in title and 'https://m.baidu.com/s?ie=utf-8' in url:
            try:
                div_list = doc('.c-result').items()
                # 如果mu为空,.c-result-content header a会有数据,这类数据样式特别,比如资讯聚合
                a_list = doc('.c-result .c-result-content header a').items()
            except Exception as e:
                print('提取div块失败', e)
            else:
                pass
        else:
            print('源码异常---------------------')
            time.sleep(120)
        return div_list

    # 提取排名的真实url
   def get_real_urls(self, div_list):
        real_urls_rank = []
        if div_list:
            try:
                for div in div_list:
                    data_log = div.attr('data-log')
                    data_log = json.loads(data_log.replace("'", '"')) # json字符串双引号
                    srcid = data_log['ensrcid'] if 'ensrcid' in data_log  else 'ensrcid' # 样式特征
                    rank_url = data_log['mu'] if 'mu' in data_log else '' # mu为空或不存在
                    rank = data_log['order']
                    if rank_url:
                        real_urls_rank.append((rank_url,rank,srcid))
                    else:
                        article = div('.c-result-content article')
                        link = article.attr('rl-link-href')
                        real_urls_rank.append((link,rank,srcid))
            except Exception as e:
                print(e,'提取rank_url error')
            else:
                pass        
        return real_urls_rank

    # 提取某url的域名部分
    def get_domain(self,real_url):
        domain = None
        try:
           res = urlparse(real_url)
        except Exception as e:
           print (e,real_url)
        else:
           domain = res.netloc
        return domain

    # 获取某词serp源码首页排名所有域名
    def get_domains(self,real_url_list):
            domain_list = [self.get_domain(real_url) for real_url in real_url_list]
            # 一个词某域名多个url有排名,算一次
            domain_set = set(domain_list)
            domain_set = domain_set.remove(None) if None in domain_set else domain_set
            domain_str = ','.join(domain_set)
            return domain_str

    # 线程函数
    def run(self):
        js_url = 'https://fclick.baidu.com/w.gif?baiduid=14E9731020ACEE14821E1A67DABB2862&asp_time=1581297830764&query={0}&queryUtf8={1}&searchid=a0bc28b872b56b7e&osid=1&bwsid=5&adt=0&adb=0&wst=146&top=0&wise=10&middle=0&bottom=0&adpos=t_0_0.00&pbt=146&yxh=0&zoom=1.0555555555555556&validHeight=521&initViewZone=w_1_0.00%3Aw_2_1.00&adsHeight=_w1%3A255_w2%3A255_w3%3A487_w4%3A228_w5%3A204_w6%3A165_w7%3A189_w8%3A255_w9%3A151_w10%3A103&adsCmatch=&availHeight=667&availWidth=375&winHeight=667&winWidth=375&action=init&model=%7B%22vt%22%3A%22w1%3A0%23w2%3A0%23w3%3A0%23w4%3A0%23w5%3A0%23w6%3A0%23w7%3A0%23w8%3A0%23w9%3A0%23w10%3A0%22%2C%22pt%22%3A%22%22%2C%22ext%22%3A%5B%5D%2C%22vsh%22%3A521%2C%22asid%22%3A%22%22%2C%22rd%22%3A1581297833317%7D&tag=ecom_wise_listen_n&rand=1581297833325.636'
        while 1:
            group_kwd = q.get()
            group,kwd = group_kwd
            print(group,kwd)
            try:
                url = "https://m.baidu.com/s?ie=utf-8&word={0}".format(kwd)
                # js_url = js_url.format(kwd,kwd)
                my_header = get_header()
                request_js(js_url,my_header)
                html,now_url = self.get_html(url,my_header)
                divs_res = self.get_divs(html,now_url)
                # 源码ok再写入
                if divs_res:
                    real_urls_rank = self.get_real_urls(divs_res)
                    real_urls = []
                    for my_url,my_order,my_attr in real_urls_rank:
                        real_urls.append(my_url)
                        f_all.write('{0}    {1} {2} {3} {4}
'.format(kwd,my_url,my_order,my_attr,group))
                    f_all.flush()
                    domain_str = self.get_domains(real_urls)
                    # 目标站点是否出现
                    for domain in domains:
                        if domain not in domain_str:
                              f.write('{0}  {1} {2} {3} {4}
'.format(kwd, '无', '无', group,domain))
                        else:
                            for my_url,my_order,my_attr in real_urls_rank:
                                if domain in my_url:
                                    f.write('{0}    {1} {2} {3} {4}
'.format(kwd,my_url,my_order,group,domain))
                                    print(my_url, my_order)
                                    break # 取第一个排名url
                f.flush()
            except Exception as e:
                print(e)
            finally:
                del kwd
                gc.collect()
                q.task_done()
                time.sleep(0.5)
                

if __name__ == "__main__":
    start = time.time()
    local_time = time.localtime()
    today = time.strftime('%Y%m%d',local_time)
    domains = ['5i5j.com','lianjia.com','anjuke.com','fang.com'] # 目标域名
    my_domain = '5i5j.com' # 自己域名
    q,group_list = bdmoIndexMonitor.read_excel('2020kwd_url_core_city_unique.xlsx')  # 关键词队列及分类
    result = bdmoIndexMonitor.result_init(group_list)  # 初始化结果
    all_num = q.qsize() # 总词数
    f = open('{0}bdmo1_index_info.txt'.format(today),'w',encoding="utf-8")
    f_all = open('{0}bdmo1_index_all.txt'.format(today),'w',encoding="utf-8")
    file_path = f.name
    # 设置线程数
    for i in list(range(1)):
        t = bdmoIndexMonitor()
        t.setDaemon(True)
        t.start()
    q.join()
    f.close()
    f_all.close()
    # 根据bdmo1_index_info.txt计算结果
    result_last = get_result(file_path,result)
    # 写入txt文件
    write_domains_txt(result_last)
    # 写入excel
    write_myexcel(group_list,result_last,today,my_domain)
    end = time.time()
    print('关键词共{0}个,耗时{1}min'.format(all_num, (end - start) / 60))

测试查询3.6万个小区词,筛选前20的域名出来瞅瞅。

m.baidu.com 15.55%
34689.recommend_list.baidu.com 9.48%
m.anjuke.com 8.87%
baike.baidu.com 8.03%
m.lianjia.com 7.05%
m.fang.com 5.68%
m.5i5j.com 4.95%
mobile.anjuke.com 3.63%
m.focus.cn 3.08%
m.58.com 2.59%
nourl.ubs.baidu.com 2.03%
mpoi.mapbar.com 1.84%
m.ke.com 1.52%
zhidao.baidu.com 1.36%
map.baidu.com 1.13%
m.jiwu.com 1.00%
m.dianping.com 0.76%
m.city8.com 0.60%
m.loupan.com 0.59%
www.anjuke.com 0.55%

附bdmo1_tj.py脚本

# ‐*‐ coding: utf‐8 ‐*‐
"""
根据bdmo1_index_all.txt数据统计域名覆盖率
sigma.baidu.com:xx_相关网站|xx_相关企业
recommend_list.baidu.com:其他人还在搜
nourl.ubs.baidu.com:搜索智能聚合
bzclk.baidu.com:结构化的展示样式
"""

import requests
from pyquery import PyQuery as pq
import threading
import queue
import time
from urllib.parse import urlparse
import gc
import json
from openpyxl import load_workbook
from openpyxl import Workbook


# 提取某条url域名部分
def get_domain(real_url):

    # 通过mu提取url有些非自然排名url是空
    try:
       res = urlparse(real_url)  # real_url为空不会报错
    except Exception as e:
       print (e,real_url)
       domain = "xxx"
    else:
       domain = res.netloc
    return domain


# 读取文件获取关键词类别
def read_excel(filepath):
    city_list = []
    wb_kwd = load_workbook(filepath)
    for sheet_obj in wb_kwd:
        sheet_name = sheet_obj.title
        city_list.append(sheet_name)
    return city_list

# 初始化结果字典
def result_init(group_list):
    result = {}
    for group in group_list:
        result[group] = {}
    print("结果字典init...")
    return result

def save():
    res_format = result.items()
    #写入excel文件
    wb = Workbook()
    # 创建sheet
    for city in city_list:
        sheet_num = 0
        wb.create_sheet(u'{0}'.format(city),index=sheet_num)
        sheet_num += 1
    for city,data_dict in res_format:
        sort_dict = sorted(data_dict.items(), key=lambda s: s[1], reverse=True)
        for domain,num in sort_dict:
            row_value = [domain,num]
            wb[u'{0}'.format(city)].append(row_value)
    wb.save('{0}bdmo1_index_cover.xlsx'.format(today))

    # 写入txt
    res_format = sorted(result_all.items(), key=lambda s: s[1], reverse=True)
    with open('{0}bdmo1_domain_res.txt'.format(today),'w',encoding='utf-8') as f:
        for domain,num in res_format:
            f.write(domain+'    '+str(num)+'    ' + str('{:.2%}'.format(num/count)) + '
')


if __name__ == "__main__":
    start = time.time()
    local_time = time.localtime()
    today = time.strftime('%Y-%m-%d', local_time)
    today = '20200210'
    city_list = read_excel('2020xiaoqu_kwd_city_1.xlsx')
    result = result_init(city_list)  # 初始化结果字典
    result_all={} # 不区分城市统计

    #文件比较大统计行数
    count=-1
    for count, line in enumerate(open('{0}bdmo1_index_all.txt'.format(today),'r',encoding='utf-8')):
        count+=1
    print(count)

    # 统计每个域名出现了多少次
    for i in open('{0}bdmo1_index_all.txt'.format(today),'r',encoding='utf-8'):
        i = i.strip()
        line = i.split('    ')
        url = line[1]
        city = line[4]
        style = line[3] # 特征,如rel_ugc为热议
        if url.startswith('http'):
            domain = get_domain(url)
            result[city][domain] = result[city][domain]+1 if domain in result[city] else 1
            result_all[domain] = result_all[domain]+1 if domain in result_all else 1
        else:
            result[city][style] = result[city][style]+1 if style in result[city] else 1
            result_all[style] = result_all[style]+1 if style in result_all else 1
    # 结果保存文件
    save()

    end = time.time()


文章评论

python百度mo域名首页覆盖率查询文章写得不错,值得赞赏