본문 바로가기

빅데이터

twitter api + google cloud pub/sub 사용해보기

twitter api로 원하는 키워드 검색하고 pub/sub에 적용해 보자

 

1. google cloud pub/sub 주제 만들기

 

2. twitter streaming data를 google cloud pub/sub에 publish 하기

 

3. google cloud function을 이용하여 biqqurey에 연동하기

 


1. google cloud pub/sub 주제 만들기

 

 

 

** 프로젝트 만드는 방법

cloud.google.com/resource-manager/docs/updating-project?hl=ko

 

프로젝트 만들기 및 관리  |  Resource Manager 문서  |  Google Cloud

Google Cloud 프로젝트는 API를 관리하고 결제를 사용 설정하며 공동작업자를 추가 및 삭제하고 Google Cloud 리소스에 대한 권한을 관리하는 등 모든 Google Cloud 서비스를 만들고 사용 설정하고 사용하��

cloud.google.com

google cloud console에 접속하여 내 프로젝트에서 [제품] -> [빅데이터] -> [pub/sub]을 선택하자. (오른쪽 핀을 누르면 프로젝트 상단에 위치하게된당)

 

 

[주제 만들기]를 통해 topic id를 설정해서 만들어준다. 그 다음 프로젝트 권한을 위해 key 설정이 필요하다.

 

 

 

[IAM 및 관리자] -> [서비스 계정] -> [서비스 계정 만들기]를 통해 key를 발급 받자.

 

 

 

서비스 계정을 만들 때, 권한을 biqqurey, pub/sub 편집자로 주자. 이 때 검색을 pub/sub말고 게시/구독으로 해야한다..

 

 

처음 만들게 되면 키 없음인데 당황하지 않고 ...을 눌러준 뒤 키를 만들어 준다.

 

 

그럼 다음과 같이 JSON으로 설정해주면 자동으로 JSON key가 다운로드 된다.

 

 

 

2. twitter streaming data를 google cloud pub/sub에 publish 하기

 

아까 발급받은 JSON key를 프로젝트 폴더에 놓자.

 

pip install google-cloud-pubsub

google cloud pub/sub을 사용하기 위한 python 라이브러리르 다운로드 받자.

** 기존에 있는 설치되어 있는 python 환경에 설치하면 pyyaml 문제가 발생한다.. 새로운 python env를 만들어서 사용하자!

 

import json

import tweepy
from google.cloud import pubsub_v1

from google.oauth2 import service_account

key_path = "<json-key-path>"

credentials = service_account.Credentials.from_service_account_file(
   key_path,
   scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = pubsub_v1.PublisherClient(credentials=credentials)
topic_path = client.topic_path('<project-id>', '<topic-id>')

twitter_api_key = '<twitter_api_key>'
twitter_api_secret_key = '<twitter_api_secret_key>'
twitter_access_token = '<twitter_access_token>'
twitter_access_token_secret = '<twitter_access_token_secret>'

class SimpleStreamListener(tweepy.StreamListener):
   def on_status(self, status):
       print(status)
       tweet = json.dumps({'id': status.id, 'created_at': status.created_at, 'text': status.text}, default=str)
       client.publish(topic_path, data=tweet.encode('utf-8'))

   def on_error(self, status_code):
       print(status_code)
       if status_code == 420:
           return False

stream_listener = SimpleStreamListener()

auth = tweepy.OAuthHandler(twitter_api_key, twitter_api_secret_key)
auth.set_access_token(twitter_access_token, twitter_access_token_secret)

twitterStream = tweepy.Stream(auth, stream_listener)
twitterStream.filter(track=['bts'])

이제 tweet.py를 다음과 같이 수정하자! google cloud key, twitter api key, project id, topic id에 설정한 값을 채워 넣고 원하는 데이터를 받아와보자 (나는 bts를 했다.)

코드의 내용은 streaming 데이터의 id, created_at, text 값을 json으로 dump한 뒤, client(cloud pub)으로 전달해주는 간단한 코드이다.

 

** filter의 track을 처음에 'bts'로 줬는데 자꾸 os error가 발생했다. 검색해보니 멀티프로세싱쪽 문제인 것 같은데, 데이터 입력이 너무 많아서 인 것 같아서 'bts!'로 주니깐 데이터 입력도 그렇게 많지 않아서 안정적으로 잘 동작했다.

 

실행하면 쭈루룩 트윗 데이터가 streaming 된다. 이제 cloud pub/sub에 잘들어가는지 확인하자.

 

 

내가 streaming 해준 주제에 들어간다.

 

 

[메시지 보기] --> [주제 선택] --> [풀] 로 데이터가 잘 들어오는 지확인할 수 있다.

 

 

데이터가 잘 들어간당

 

 

3. google cloud function을 이용하여 biqqurey에 연동하기

 

이제 데이터를 bigquery에 연동해보자.

 

bigquery에 가서 데이터세트와 테이블을 만든다. bigqurey를 이제 cloud fucntion을 통해 pub/sub과 연동하자.

 

 

 

다시 pub/sub의 주제에 가서 CLOUD 함수 트리거로 간다.

 

 

트리거 유형을 pub/sub으로 저장 하고, 코드를 작성해야 한다.

 

런타임을 python으로 설정하면 main.py와 requirements.txt가 생성된다. main.py에는 메인코드(bigquery로 전달해줄), requirements.txt에는 main.py에 사용할 패키지목록을 작성하면 된당.

 

import base64
import json
from google.cloud import bigquery

def tweet2bigqurey(tweet):
     client = bigquery.Client()
     dataset_ref = client.dataset('<dataset-id>')
     table_ref = dataset_ref.table('<table-id>')
     table = client.get_table(table_ref)

     tweet_dict = json.loads(tweet)
     rows_to_insert = [
        (tweet_dict['id'], tweet_dict['created_at'], tweet_dict['text'])
    ]

     error = client.insert_rows(table, rows_to_insert)
     print(error)

def send(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    pubsub_message = base64.b64decode(event['data']).decode('utf-8')
    print(pubsub_message)
    tweet2bigqurey(pubsub_message)

main.py는 다음과 같다. bts 스트리밍 tweet 데이터를 bigqurey로 전송해주는 코드이다.

 

cloud.google.com/functions/docs/tutorials/pubsub?hl=ko#functions-change-directory-python

 

Cloud Pub/Sub 가이드  |  Cloud Functions 문서  |  Google Cloud

애플리케이션 준비 샘플 앱 저장소를 로컬 머신에 복제합니다. Node.js git clone https://github.com/GoogleCloudPlatform/nodejs-docs-samples.git 또는 zip 파일로 샘플을 다운로드하고 압축을 풀 수 있습니다. Python gi

cloud.google.com

다음 cloud pub/sub 가이드 라인을 통해 배포 예시를 확인할 수 있다. send 함수의 event, context 값의 자세한 사항을 확인해보자.

 

google-cloud-bigqurey

requirements.txt에는 bigqurey 사용을 위한 패키지만 작성한다.

 

 

전부 작성 후, 런타임 옆에 진입점에 함수의 이름, send로 설정하고 배포해주고 조금 기다리면 배포가 완료 된다.

이제 다시 tweet.py를 실행시켜 스트리밍 데이터를 pub에 전송해주자.

 

이제 bigquery에 간단한 sql을 작성해서 데이터가 들어가는지 확인할 수 있다.

 


해볼 것.

- GKE로 자동배포 

- rasperyy pi 센서데이터를 파이프라인을 pub/sub, bigquery와 연동해보기

- google assistant api 써보기

'빅데이터' 카테고리의 다른 글

google kubernetes engine으로 배포해보기  (0) 2020.09.24
python으로 twitter api 사용해보기  (0) 2020.09.23