3.5 整合在一起
之前,我们都在 Jupyter 记事本里工作,但现在需要部署应用程序了,我们会切换到文本编辑器中工作。记事本对于探索性分析和可视化是很棒的,不过运行后台作业最好使用一个简单的.py 文件。所以让我们开始吧。
我们将从包的引入开始。如果你尚未安装这些包,可能需要通过 pip 安装它们。
import sys
import pandas as pd
import numpy as np
import requests
from selenium import webdriver
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from bs4 import BeautifulSoup
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import schedule
import time
接下来,我们将创建一个函数来拉取数据并运行聚类算法。请注意,当我们获取数据的时候,会存在一个明显的等待。这是因为页面有 AJAX 的异步请求,我们需要增加一个等待时间,以确保继续下一步之前页面的定价数据已经被返回。如果由于某种原因,抓取仍然失败,那么这里将发送一个文本作为提醒。
def check_flights():
url = "https://www.google.com/flights/explore/#explore;f=JFK,EWR,LGA;t=HND,NRT,TPE,HKG,KIX;s=1;li=8;lx=12;d=2016-04-01"
driver = webdriver.PhantomJS()
dcap = dict(DesiredCapabilities.PHANTOMJS)
dcap["phantomjs.page.settings.userAgent"] = \
("Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2490.80 Safari/537.36")
driver = webdriver.PhantomJS(desired_capabilities=dcap,service_args=['--ignore-ssl-errors= true'])
driver.get(url)
wait = WebDriverWait(driver, 20)
wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR,"span.FTWFGDB-v-c")))
s = BeautifulSoup(driver.page_source, "lxml")
best_price_tags = s.findAll('div', 'FTWFGDB-w-e')
# 检查爬虫是否工作正常——如果失败了就发送提醒并关闭
if len(best_price_tags) < 4:
print('Failed to Load Page Data')
requests.post('https://maker.ifttt.com/trigger/fare_alert/with/key/MY_SECRE
T_KEY',data={"value1": "script", "value2": "failed","value3": ""})
sys.exit(0)
else:
print('Successfully Loaded Page Data')
best_prices = []
for tag in best_price_tags:
best_prices.append(int(tag.text.replace('$', '')))
best_price = best_prices[0]
best_height_tags = s.findAll('div', 'FTWFGDB-w-f')
best_heights = []
for t in best_height_tags:
best_heights.append(float(t.attrs['style']
.split('height:')[1].replace('px;', '')))
best_height = best_heights[0]
# 每个高度像素对应的价格
pph = np.array(best_price)/np.array(best_height)
cities = s.findAll('div', 'FTWFGDB-w-o')
hlist = []
for bar in cities[0].findAll('div', 'FTWFGDB-w-x'):
hlist.append(float(bar['style'].split('height: ')[1].replace('px;', '')) * pph)
fares = pd.DataFrame(hlist, columns=['price'])
px = [x for x in fares['price']]
ff = pd.DataFrame(px, columns=['fare']).reset_index()
# 开始聚类
X = StandardScaler().fit_transform(ff)
db = DBSCAN(eps=1.5, min_samples=1).fit(X)
labels = db.labels_
clusters = len(set(labels))
pf = pd.concat([ff, pd.DataFrame(db.labels_, columns= ['cluster'])],axis=1)
rf = pf.groupby('cluster')['fare'].agg(['min', 'count']).sort_values('min', scending=True)
现在我们将检查规则是否被触发。如果是,就发送接收文本的请求。
#设置我们的规则
#必须有多于 1 个的聚类
#聚类的最小值必须等于最低的票价
#聚类的大小必须小于全部数量的百分之 10
# 聚类必须比次低的价格聚类少 100 美元及以上
if clusters > 1\
and ff['fare'].min() == rf.iloc[0]['min']\
and rf.iloc[0]['count'] < rf['count'].quantile(.10)\
and rf.iloc[0]['fare'] + 100 < rf.iloc[1]['fare']:
city = s.find('span', 'FTWFGDB-v-c').text
fare = s.find('div', 'FTWFGDB-w-e').text
requests.post('https://maker.ifttt.com/trigger/fare_alert/with/key/MY_SECRE
T_KEY', data={"value1": city, "value2": fare, "value3": ""})
else:
print('no alert triggered')
最终,我们将设置一个调度器。它每 60 分钟运行一次这里的代码。
#设置调度器,每 60 分钟运行一次我们的代码
schedule.every(60).minutes.do(check_flights)
while 1:
schedule.run_pending()
time.sleep(1)
这样做应该就完工了。我们现在可以将其保存为 fare_alerter.py,并在命令行中运行它。它将持续运行并每 60 分钟检查一次票价。如果出现了错误的机票,我们将是第一个知道的!
请注意,这是此类代码的最基本实现。为了创建一个合理的实现,应该使用良好的日志记录来代替这里列出的打印语句。有关如何实现日志记录的更多信息,请参见 https://docs.python.org/3.4/howto/logging.html#logging-basic-tutorial。
本书评论