1.웹크롤링 카페(완성)
2.API 수정
3.웹크롤링 csv 합치기 (store)/ 카페 완성
4.웹크롤링 숙소
1.웹크롤링 카페(완성)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import json
import time
from time import sleep
import random
import re
# 크롤링 데이터 저장용 리스트
store_data = []
# 왼쪽 iframe 전환
def switch_left():
driver.switch_to.parent_frame()
iframe = driver.find_element(By.XPATH, '//*[@id="searchIframe"]')
driver.switch_to.frame(iframe)
# 오른쪽 iframe 전환
def switch_right():
driver.switch_to.parent_frame()
iframe = driver.find_element(By.XPATH, '//*[@id="entryIframe"]')
driver.switch_to.frame(iframe)
options = webdriver.ChromeOptions()
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
options.add_argument('window-size=1380,900')
driver = webdriver.Chrome(options=options)
# 네이버 지도 URL
# 지도에 검색한 키워드 URL을 아래 복붙붙
driver.get(url=URL)
driver.implicitly_wait(5)
# 한 번에 끝까지 스크롤
def scroll_to_bottom():
last_height = driver.execute_script("return document.body.scrollHeight") # 초기 높이 가져오기
while True:
# 페이지 끝까지 스크롤
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
sleep(2) # 로딩 대기
new_height = driver.execute_script("return document.body.scrollHeight") # 새로운 높이 가져오기
if new_height == last_height: # 높이가 더 이상 변하지 않으면 종료
break
last_height = new_height
# 1000px씩 끊어서 스크롤
def scroll_to_1000px(scrollable_element):
last_height = driver.execute_script("return arguments[0].scrollHeight", scrollable_element) # 초기 높이 가져오기
while True:
# 요소 내에서 아래로 1000px 스크롤
driver.execute_script("arguments[0].scrollTop += 1000;", scrollable_element)
sleep(2) # 로딩 대기
new_height = driver.execute_script("return arguments[0].scrollHeight", scrollable_element) # 새로운 높이 가져오기
if new_height == last_height: # 높이가 더 이상 변하지 않으면 종료
break
last_height = new_height
# 누적 크롤링 시간 확인용 변수
total_elapsed_time = 0
#####중간에 멈췄을 경우 아래 코드 실행#####
switch_left()
#페이지 이동 필요만큼 아래 두 행 복붙
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').click()
sleep(1)
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').click()
sleep(1)
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').click()
sleep(1)
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').click()
sleep(1)
while True:
switch_left()
# 맨 밑까지 스크롤 (1000px 씩 끊어서)
scrollable_element = driver.find_element(By.CLASS_NAME, "Ryr1F")
scroll_to_1000px(scrollable_element)
# 현재 페이지 번호
page_no = driver.find_element(By.XPATH, '//a[contains(@class, "mBN2s qxokY")]').text
# n번째 가게부터 시작
# #####처음 실행할 경우#####
# elements = driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')
# #####중간에 멈춰서 다시 실행할 경우 아래 코드의 숫자 수정 필요#####
elements = driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')[0:] if page_no == '5' else driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')
for index, e in enumerate(elements, start=1):
# 광고 태그
is_ad = False
# 광고면 is_ad = True 아니면 기존값 False
try:
ad_tag = e.find_element(By.XPATH, './/span[@class="place_blind" and text()="광고"]')
if ad_tag:
is_ad = True
except:
pass
# 데이터 초기화
store_info = {
'index' : index,
'page' : page_no,
'store_name' : '',
'category' : '',
'is_ad' : is_ad,
'rating' : 0.0,
'visited_review' : 0,
'blog_review' : 0,
'address' : '',
'business_hours' : [],
'phone_num' : '',
'menu' : [],
'top_reviews' : [],
'theme_keywords' : {'분위기': [], '인기토픽': [], '찾는목적': []},
}
# 크롤링 시간 확인용
start_time = time.time()
# 가게 클릭 및 상세 페이지 이동
# e.find_element(By.CLASS_NAME, 'CHC5F').find_element(By.XPATH, ".//a/div/div/span").click()
clickable_element = e.find_element(By.CLASS_NAME, 'CHC5F').find_element(By.XPATH, ".//a/div/div/span[1]")
if is_ad == True:
# 요소의 위치 가져오기
location = clickable_element.location
size = clickable_element.size
# 광고 태그 크기 확인
ad_tag_width = ad_tag.size['width']
# 약간 오른쪽으로 위치 조정 (x_offset 값 수정 가능)
x_offset = ad_tag_width + 5 # 가장 왼쪽에서 광고 태그만큼 n px 이동
y_offset = size['height'] // 2 # 요소의 세로 중심
# ActionChains로 클릭 위치 이동
actions = ActionChains(driver)
actions.move_to_element_with_offset(clickable_element, x_offset, y_offset).click().perform()
else:
clickable_element.click()
sleep(2)
switch_right()
try:
# 상세 정보 크롤링
title = driver.find_element(By.XPATH, '//div[@class="zD5Nm undefined"]')
store_info['store_name'] = title.find_element(By.XPATH, './/div[1]/div[1]/span[1]').text
store_info['category'] = title.find_element(By.XPATH, './/div[1]/div[1]/span[2]').text
# 리뷰 정보
review_elements = title.find_elements(By.XPATH, './/div[2]/span')
if len(review_elements) > 2:
store_info['rating'] = review_elements[0].text
store_info['visited_review'] = review_elements[1].text
store_info['blog_review'] = review_elements[2].text
elif len(review_elements) > 1:
store_info['visited_review'] = review_elements[0].text
store_info['blog_review'] = review_elements[1].text
# 가게 주소 및 영업 시간
store_info['address'] = driver.find_element(By.XPATH, '//span[@class="LDgIH"]').text
try:
driver.find_element(By.XPATH, '//div[@class="y6tNq"]//span').click()
sleep(2)
parent_element = driver.find_element(By.XPATH, '//a[@class="gKP9i RMgN0"]')
child_elements = parent_element.find_elements(By.XPATH, './*[@class="w9QyJ" or @class="w9QyJ undefined"]')
store_info['business_hours'] = [child.text for child in child_elements]
store_info['phone_num'] = driver.find_element(By.XPATH, '//span[@class="xlx7Q"]').text
except:
pass
# 메뉴 정보
scroll_to_bottom() # 한 번에 밑으로 스크롤
sleep(2)
menu_elements_1 = driver.find_elements(By.XPATH, '//ul[contains(@class, "jnwQZ")]/li')
menu_elements_2 = driver.find_elements(By.XPATH, '//ul[contains(@class, "t1osG")]/li')
store_info['menu'] = []
# 첫 번째 구조 처리
for menu_element in menu_elements_1:
name = menu_element.find_element(By.XPATH, './/a').text.strip()
price = menu_element.find_element(By.XPATH, './/div[@class="mkBm3"]').text.strip()
store_info['menu'].append({"name": name, "price": price})
# 두 번째 구조 처리
for menu_element in menu_elements_2:
name = menu_element.find_element(By.XPATH, './/span[@class="VQvNX"]').text.strip()
price = menu_element.find_element(By.XPATH, './/div[@class="gl2cc"]').text.strip()
store_info['menu'].append({"name": name, "price": price})
# 방문자 리뷰
review_elements = driver.find_elements(By.XPATH, '//ul[@class="K4J9r"]')
store_info['top_reviews'] = [review_element.text for review_element in review_elements[:5]]
# 테마 키워드
theme_elements = driver.find_elements(By.XPATH, '//ul[contains(@class, "v4tIa")]/li')
# 각 테마 키워드 요소를 순회
for theme_element in theme_elements:
# 카테고리 이름 추출
category_name = theme_element.find_element(By.CLASS_NAME, 'pNnVF').text.strip()
# 키워드 추출
keyword_container = theme_element.find_element(By.CLASS_NAME, 'sJgQj')
keywords = keyword_container.find_elements(By.TAG_NAME, 'span')
keyword_texts = [keyword.text.strip() for keyword in keywords]
# 카테고리에 맞게 키워드 추가
if category_name in store_info["theme_keywords"]:
store_info["theme_keywords"][category_name].extend(keyword_texts)
else:
print(f"알 수 없는 카테고리: {category_name}")
except Exception as ex:
print('------------ 데이터 크롤링 오류 ------------' )
print(ex)
store_data.append(store_info)
switch_left()
sleep(2)
# JSON 형식으로 저장
# #######!!!!!!!!n번째 실행할 경우, 아래 cafe_data 뒤에 넘버링 필수!!!!!!!!#######
with open("cafe_data3.json", "w", encoding="utf-8") as f:
json.dump(store_data, f, ensure_ascii=False, indent=4)
# 출력 (테스트용)
print(json.dumps(store_info, ensure_ascii=False, indent=4))
elapsed_time = time.time() - start_time # 소요 시간 계산
total_elapsed_time += elapsed_time # 누적 시간 갱신
print(f"Crawling Time: {elapsed_time:.2f} seconds") # 소요 시간 출력
print(f"Total Elapsed Time: {total_elapsed_time:.2f} seconds") # 누적 시간 출력
# 다음 페이지로 이동
next_page = driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').get_attribute('aria-disabled')
if next_page == 'false':
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[7]').click()
else:
break
import json
import csv
# JSON을 CSV로 변환
with open('C:/Users/82106/Desktop/데이터분석/강의/10.최종프로젝트/참고 데이터/드라이브 다운로드/Crawling/cafe_data3.json', 'r', encoding='utf-8') as input_file, \
open('C:/Users/82106/Desktop/데이터분석/강의/10.최종프로젝트/참고 데이터/드라이브 다운로드/Crawling/cafe_data3.csv', 'w', newline='', encoding='utf-8') as output_file:
data = json.load(input_file)
# CSV writer 초기화
f = csv.writer(output_file)
# CSV 파일에 header 추가
headers = [
'index', 'page', 'store_name', 'category', 'is_ad', 'rating',
'visited_review', 'blog_review', 'address', 'business_hours',
'phone_num', 'menu', 'top_reviews', 'theme_keywords'
]
f.writerow(headers)
# 데이터가 리스트인지 확인
if isinstance(data, list):
for store in data:
# 리스트나 딕셔너리를 문자열로 변환
business_hours = json.dumps(store.get('business_hours', []), ensure_ascii=False)
menu = json.dumps(store.get('menu', []), ensure_ascii=False)
top_reviews = json.dumps(store.get('top_reviews', []), ensure_ascii=False)
theme_keywords = json.dumps(store.get('theme_keywords', {}), ensure_ascii=False)
row = [
store.get('index', ''),
store.get('page', ''),
store.get('store_name', ''),
store.get('category', ''),
store.get('is_ad', ''),
store.get('rating', 0.0),
store.get('visited_review', 0),
store.get('blog_review', 0),
store.get('address', ''),
business_hours,
store.get('phone_num', ''),
menu,
top_reviews,
theme_keywords
]
f.writerow(row)
else:
print("JSON 데이터가 리스트 형태가 아닙니다.")
진심.. 최고임 (나는 못해)
2. API 수정
- 이미지 정보 조회 - contentId -> 전체 리스트 코드 필요!
contentId 1개는 잘 받아짐 - 안 들어가 있는 정보들도 있어서 안 나온 거였음. 다른 Id 넣어보니 잘 됨 (전체 id 리스트 나오게 하는 코드)
- data정보/ category정보/
- 지역기반 관광정보 조회
areaCode 3 대전 정보만 나오게 가능
- 위치기반 관광정보 조회
[수정] 파라미터 없는 값 지웠음 area code, 시군구 등
전체 코드 numOfRows pageNo listYN Y
--> 반려동물동반 조회 API 정보가 필요한데, 여기 필수 파라미터 중 하나가 contentId (지역정보기반 조회에 있음).
대전 areacode = 3 으로 하고 contentId를 조회해 봤을 때, page = 2로 지정하고 돌리면, total이 19개 로 뜸. (행 1000개씩 page =1 은 값이 쭉 나와서 total 확인 불가)
다른 지역 서울 areacode = 1 로 하고 해도 마찬가지로 page = 2 로 했더니 total 190개 뜸.
즉, 이 API 에서는 대전관련 contentId 가 19개 밖에 없다는 뜻. 반려동물동반 조회도 19개만 가능. -> 사용무리.
방법 바꿈 (웹크롤링 - 네이버 애견동반 키워드 로!)
# 반려동물 동반여행조회
url = f"{BASE_URL}/detailPetTour"
headers = {'Accept': 'application/json'}
df_result = None
for content_id in contentid_list:
params = {
"MobileOS": "ETC",
"MobileApp": "AppTest",
"serviceKey": AUTH_KEY,
"_type": "json",
"contentId": content_id
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
print(response.text)
data = response.json()
if 'items' in data.get('response', {}).get('body', {}):
items = data['response']['body']['items']['item']
df_pet = pd.DataFrame(items)
print("반려동물 동반여행 데이터프레임:\n", df_pet.head())
#all_data.append(("반려동물 동반여행 정보", df_area))
if df_result is None:
df_result = df_pet
else:
df_result = pd.concat([df_result, df_pet], ignore_index=True)
else:
print("응답 데이터에 'items'가 없습니다.")
else:
print(f"API 요청 실패: {response.status_code}")
print(response.text) # 오류 메시지 출력
# 데이터 저장
df_result.to_csv("pet_tour.csv", index=False)
print("데이터가 'pet_tour.csv'로 저장되었습니다.")
df2 = df_result.merge(df, how='inner', on='contentid')
df2.head(1)
df3=df2[["contentid","title","acmpyTypeCd", "etcAcmpyInfo", "acmpyPsblCpam", "acmpyNeedMtr"]]
df3
#유성구(11)
https://dogagit.tistory.com/653
#중구(3)
https://dogagit.tistory.com/655
#동구(4)
https://dogagit.tistory.com/654
#서구(12)
https://dogagit.tistory.com/652
3. 웹크롤링 (애견동반) _ 코드수정 필요 (필터클릭, 애견동반클릭)
4. 웹크롤링 (호텔)
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.action_chains import ActionChains
import json
import time
from time import sleep
import random
import re
# 크롤링 데이터 저장용 리스트
store_data = []
# 왼쪽 iframe 전환
def switch_left():
driver.switch_to.parent_frame()
iframe = driver.find_element(By.XPATH, '//*[@id="searchIframe"]')
driver.switch_to.frame(iframe)
# 오른쪽 iframe 전환
def switch_right():
driver.switch_to.parent_frame()
iframe = driver.find_element(By.XPATH, '//*[@id="entryIframe"]')
driver.switch_to.frame(iframe)
options = webdriver.ChromeOptions()
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3')
options.add_argument('window-size=1380,900')
driver = webdriver.Chrome(options=options)
# 네이버 지도 URL
# 지도에 검색한 키워드 URL을 아래 복붙
driver.get(url=URL)
driver.implicitly_wait(5)
# 한 번에 끝까지 스크롤
def scroll_to_bottom():
last_height = driver.execute_script("return document.body.scrollHeight") # 초기 높이 가져오기
while True:
# 페이지 끝까지 스크롤
driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
sleep(2) # 로딩 대기
new_height = driver.execute_script("return document.body.scrollHeight") # 새로운 높이 가져오기
if new_height == last_height: # 높이가 더 이상 변하지 않으면 종료
break
last_height = new_height
# 1000px씩 끊어서 스크롤
def scroll_to_1000px(scrollable_element):
last_height = driver.execute_script("return arguments[0].scrollHeight", scrollable_element) # 초기 높이 가져오기
while True:
# 요소 내에서 아래로 1000px 스크롤
driver.execute_script("arguments[0].scrollTop += 1000;", scrollable_element)
sleep(2) # 로딩 대기
new_height = driver.execute_script("return arguments[0].scrollHeight", scrollable_element) # 새로운 높이 가져오기
if new_height == last_height: # 높이가 더 이상 변하지 않으면 종료
break
last_height = new_height
# 누적 크롤링 시간 확인용 변수
total_elapsed_time = 0
#####중간에 멈췄을 경우 아래 코드 실행#####
# switch_left()
# # 페이지 이동 필요만큼 아래 두 행 복붙
# driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').click()
# sleep(1)
# driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').click()
# sleep(1)
# driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').click()
# sleep(1)
# driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').click()
# sleep(1)
while True:
switch_left()
# 맨 밑까지 스크롤 (1000px 씩 끊어서)
scrollable_element = driver.find_element(By.CLASS_NAME, "Ryr1F")
scroll_to_1000px(scrollable_element)
# 현재 페이지 번호
page_no = driver.find_element(By.XPATH, '//a[contains(@class, "mBN2s qxokY")]').text
# n번째 가게부터 시작
#####처음 실행할 경우#####
elements = driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')
# # #####중간에 멈춰서 다시 실행할 경우 아래 코드의 숫자 수정 필요#####
# # elements = driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')[7:] if page_no == '5' else driver.find_elements(By.XPATH, '//*[@id="_pcmap_list_scroll_container"]//li')
for index, e in enumerate(elements, start=1):
# 광고 태그
is_ad = False
# 광고면 is_ad = True 아니면 기존값 False
try:
ad_tag = e.find_element(By.XPATH, './/span[@class="place_blind" and text()="광고"]')
if ad_tag:
is_ad = True
except:
pass
# 데이터 초기화
store_info = {
'index' : index,
'page' : page_no,
'store_name' : '',
'category' : '',
'is_ad' : is_ad,
'rating' : 0.0,
'visited_review' : 0,
'blog_review' : 0,
'address' : '',
'link' : '',
'phone_num' : '',
'menu' : [],
'keyword_reviews' : [],
}
# 크롤링 시간 확인용
start_time = time.time()
# 가게 클릭 및 상세 페이지 이동
clickable_element = e.find_element(By.CLASS_NAME, 'ouxiq').find_element(By.XPATH, ".//a/div/div/span[1]")
if is_ad == True:
# 요소의 위치 가져오기
location = clickable_element.location
size = clickable_element.size
# 광고 태그 크기 확인
ad_tag_width = ad_tag.size['width']
# 약간 오른쪽으로 위치 조정 (x_offset 값 수정 가능)
x_offset = ad_tag_width + 5 # 가장 왼쪽에서 광고 태그만큼 5 px 이동
y_offset = size['height'] // 2 # 요소의 세로 중심
# ActionChains로 클릭 위치 이동
actions = ActionChains(driver)
actions.move_to_element_with_offset(clickable_element, x_offset, y_offset).click().perform()
else:
clickable_element.click()
sleep(2)
switch_right()
try:
# 상세 정보 크롤링
try:
title = driver.find_element(By.XPATH, '//div[@class="zD5Nm undefined"]')
store_info['store_name'] = title.find_element(By.XPATH, './/div[1]/div[1]/span[1]').text
store_info['category'] = title.find_element(By.XPATH, './/div[1]/div[1]/span[2]').text
except:
pass
# 리뷰 정보
try:
review_elements = title.find_elements(By.XPATH, './/div[2]/span')
if len(review_elements) > 2:
store_info['rating'] = review_elements[0].text
store_info['visited_review'] = review_elements[1].text
store_info['blog_review'] = review_elements[2].text
elif len(review_elements) > 1:
store_info['visited_review'] = review_elements[0].text
store_info['blog_review'] = review_elements[1].text
except:
pass
# 가게 주소 및 영업 시간
try:
store_info['address'] = driver.find_element(By.XPATH, '//span[@class="LDgIH"]').text
except:
pass
try:
store_info['phone_num'] = driver.find_element(By.XPATH, '//span[@class="xlx7Q"]').text
except:
pass
# 링크
try:
store_info['link'] = driver.find_element(By.XPATH, '//div[@class="jO09N"]/a').text
except:
pass
# 호텔 최저가
try:
scroll_to_bottom()
sleep(2)
store_info['menu'] = driver.find_element(By.XPATH, '//div[@class="aqscI"]/em').text
except:
pass
# 방문자 리뷰
try:
driver.find_element(By.XPATH, '//div[@class="KERaF"]').click()
sleep(2)
parent_element_review = driver.find_element(By.XPATH, '//div[@class="wvfSn"]')
child_elements_review = parent_element_review.find_elements(By.XPATH, './div[@class="jypaX" or @class="mrSZf"]')
store_info['keyword_reviews'] = [child.text for child in child_elements_review]
except:
pass
except Exception as ex:
print('------------ 데이터 크롤링 오류 ------------' )
print(ex)
store_data.append(store_info)
switch_left()
sleep(2)
# JSON 형식으로 저장
# #######!!!!!!!!n번째 실행할 경우, 아래 cafe_data 뒤에 넘버링 필수!!!!!!!!#######
with open("hotel_data.json", "w", encoding="utf-8") as f:
json.dump(store_data, f, ensure_ascii=False, indent=4)
# 출력 (테스트용)
print(json.dumps(store_info, ensure_ascii=False, indent=4))
elapsed_time = time.time() - start_time # 소요 시간 계산
total_elapsed_time += elapsed_time # 누적 시간 갱신
print(f"Crawling Time: {elapsed_time:.2f} seconds") # 소요 시간 출력
print(f"Total Elapsed Time: {total_elapsed_time:.2f} seconds") # 누적 시간 출력
# 다음 페이지로 이동
next_page = driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').get_attribute('aria-disabled')
if next_page == 'false':
driver.find_element(By.XPATH, '//*[@id="app-root"]/div/div[2]/div[2]/a[last()]').click()
else:
break