1) 디비 스키마는 아래와 같습니다.


* desc rnd

* desc rnd_category



2) 개발환경은 macOS, 서비스환경은 AWS EC3 ubuntu 입니다.


3) macOS 로컬 디비환경은 mysql docker, AWS RDS는 mysql 입니다.


4) 파이썬으로 해당 사이트를 크롤링하여 디비에 저장하고 요약한 정보를 슬랙의 메시지로 보내는 소스입니다.


5) 소스는 아래와 같습니다. 간단한 소스라 주석은 달지 않았습니다.


#conifg.py


class Configuration:


  def get_configuration(choose):


    if(choose == 'mac'):

      connect_value = dict(host='127.0.0.1',

        user='xxxx',

        password='xxxx',

        database='xxxx',

        port=3306,

        charset='utf8')

      

    elif(choose == 'ubuntu'):

      connect_value = dict(host='xxxx.amazonaws.com',

        user='xxxx',

        password='xxxx',

        database='xxxx',

        port=3306,

        charset='utf8')

    else:

      print('Not Selected')

      connect_value = ''


    return connect_value


#db.py

import pymysql


class DBConnection:

  def __init__(self,host,user,password,database,charset,port):

    self.connection = pymysql.connect(

      host=host,

      user=user,

      password=password,

      db=database,

      charset=charset,

      port=port,

      cursorclass=pymysql.cursors.DictCursor)


  def exec_select_rnd(self,idx,mcd,scd):

    with self.connection.cursor() as cursor:

      query = Query().get_select_rnd(idx,mcd,scd)

      cursor.execute(query)

      for row in cursor:

        data = row.get('cnt')

    #print('data: ',data)

    return data


  def exec_select_category(self,choose,code):

    with self.connection.cursor() as cursor:

      query = Query().get_select_category(choose,code)

      cursor.execute(query)   

      for row in cursor:

        if(choose == 1):

          data = row.get('val')

        elif(choose == 2):

          data = row.get('name')

    #print('data: ',data)

    return data


  def exec_insert_rnd(self,mcd,scd,idx,title,link,period,government,posted):

    #print('exec insert crawling: ',mcd,scd,idx,title,period,government,posted)

    query = Query().get_insert_rnd(mcd,scd,idx,title,link,period,government,posted)

    with self.connection as cur:

      cur.execute(query)


  def close(self):

    self.connection.close()


  def commit(self):

    self.connection.commit()


class Query:

  def get_select_rnd(self,idx,mcd,scd):

    query = 'select \

    count(*) as cnt \

    from rnd \

    where idx={} and mcd=\'{}\' and scd=\'{}\''.format(idx,mcd,scd)


    #print('getselect: ',idx,query)

    return query


  def get_select_category(self,choose,code):

    field = ''

    if(choose == 1):

      field = 'val'

    elif(choose == 2):

      field = 'name'

    query = 'select {} from rnd_category \

      where cd=\'{}\''.format(field,code)

    #print('getselectcode: ',query)

    return query


  def get_insert_rnd(self,mcd,scd,idx,title,link,period,government,posted):

    query = 'insert into rnd (mcd,scd,idx,title,link,period,government,posted) \

    values (\'{}\',\'{}\',{},\'{}\',\'{}\',\'{}\',\'{}\',\'{}\')'.format(mcd,scd,idx,title,link,period,government,posted)

    #print('getinsert: ',query)

    return query    


#mod_creativekorea.py


import re


class Tag:


  def lately_news(soup):

    tmp = []

    tmp2 = []

    table = soup.find('table',{'class':'tbl1'})

    for extract in table.select('tr'):

      td = table.find_all('td')

      onclick = td[1].find('a').attrs['onclick']

      title = Utilities.remove_keyword(td[1].find('a').get_text().strip())

      posted = td[3].get_text().strip()

      tmp = Utilities.make_array(str(onclick),',')

      tmp2 = Utilities.make_array(str(tmp[0]),'(')

      idx = str(tmp2[1])

      string = idx + ';' + title + ';' + posted

    return string 



class Utilities:


  def make_array(string,keyword): 

    arr = []

    arr = string.split(keyword)

    return arr


  def remove_keyword(string):

    string = string.replace('\"', '')

    return string


#creativekorea.py -> python3 creativekorea.py m03 s21 -> 부산창조경제혁신센터 > 알림마당 > 사업공고 크롤링

import os

import requests

from bs4 import BeautifulSoup

from selenium import webdriver

import datetime

import sys

from config import Configuration

from db import DBConnection,Query

from mod_creativekorea import Utilities,Tag

from slacker import Slacker


class News:


  def __init__(self,mcd,scd,platform):

    #print('init')

    self.mcd = mcd

    self.scd = scd

    self.platform = platform


  def set_params(self):

    self.mcd = sys.argv[1]

    self.scd = sys.argv[2]  

    self.platform = sys.argv[3]   


  def validate(self):

    default = {

    'mcd':'m03',

    'scd':'s21',

    'platform':'mac'}


    self.mcd = default.get('mcd') if self.mcd == '' else self.mcd.lower()

    self.scd = default.get('scd') if self.scd == '' else self.scd.lower()

    self.platform = default.get('platform') if self.platform == '' else self.platform.lower()#<-add

  

  def url(self):

    if self.mcd == 'm03':

      url = 'https://ccei.creativekorea.or.kr/busan/allim/allim_list.do?page=1&div_code=' 

    return url


  def send_slack(self,channel,pretext,text,title,link):

    print('slack',channel,pretext,text,title,link)


    attachments_dict = dict()

    attachments_dict['pretext'] = pretext

    attachments_dict['title'] = title

    attachments_dict['title_link'] = link

    attachments_dict['text'] = text

    attachments = [attachments_dict]


    token = 'xoxp-xxx-xxxx-xxxx-xxxx'

    slack = Slacker(token)

    slack.chat.post_message(channel=channel, text=None, attachments=attachments, as_user=True)


  def make_message(self,pretext,news_field,url):

    text = '등록일: ' + news_field[2]

    self.send_slack('#channel', pretext, text, str(news_field[1]), url)


  def crawling(self):

    #print('crawling')

    self.validate()


    try:  


      options = webdriver.ChromeOptions()

      options.add_argument('headless')

      options.add_argument('window-size=1920x1080')

      options.add_argument("disable-gpu")


      configuration = Configuration.get_configuration(self.platform)

      _host = configuration['host']

      _user = configuration['user']

      _password = configuration['password']

      _database = configuration['database']

      _port = configuration['port']

      _charset = configuration['charset'] 


      conn = DBConnection(host=_host,

        user=_user,

        password=_password,

        database=_database,

        port=_port,

        charset=_charset)


      cd_val = conn.exec_select_category(1,self.scd)

      cd_name = conn.exec_select_category(2,self.mcd)


      print('cd_val',cd_val)

      print('cd_name',cd_name)


      driver = webdriver.Chrome('/Users/gyunseul9/Bitbucket/rnd_news/chromedriver', chrome_options=options)

      driver.implicitly_wait(3)

      driver.get(self.url()+cd_val) 


      html = driver.page_source

      soup = BeautifulSoup(html, 'html.parser')


      lately_news = Tag.lately_news(soup)

      news_field = Utilities.make_array(lately_news,';')

      field_size = len(news_field)


      print('lately_news', lately_news)

      print('field_size', field_size)

      

      for i in range(0, field_size):

        print(i,news_field[i])

  

      cnt = conn.exec_select_rnd(news_field[0],self.mcd.lower(),self.scd.lower()) 


      if cnt:

        print('overlap idx field: ',cnt,self.mcd.lower(),self.scd.lower())

      else: 

        print('does not overlap idx field: ',cnt,self.mcd.lower(),self.scd.lower())   

        conn.exec_insert_rnd(self.mcd.lower(),self.scd.lower(),news_field[0],news_field[1],self.url()+cd_val,'-','-',news_field[2])


        self.make_message(cd_name,news_field,self.url()+cd_val)


    except Exception as e:

      with open('./creativekorea_error.log','a') as file:

        file.write('{} YOU GOT AN ERROR: {}\n'.format(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),str(e)))                


def run():

  news = News('','','') 

  news.set_params()

  news.crawling()


if __name__ == "__main__":

  run()

Posted by 앤비
,