123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- from abc import ABCMeta, abstractmethod, abstractproperty
- from oauth.models import OAuthUser, OAuthConfig
- from django.conf import settings
- import requests
- import json
- import logging
- import urllib.parse
- from website.utils import parse_dict_to_url
- logger = logging.getLogger(__name__)
- class BaseOauthManager(metaclass=ABCMeta):
- """获取用户授权"""
- AUTH_URL = None
- """获取token"""
- TOKEN_URL = None
- """获取用户信息"""
- API_URL = None
- '''icon图标名'''
- ICON_NAME = None
- def __init__(self, access_token=None, openid=None):
- self.access_token = access_token
- self.openid = openid
- @property
- def is_access_token_set(self):
- return self.access_token is not None
- @property
- def is_authorized(self):
- return self.is_access_token_set and self.access_token is not None and self.openid is not None
- @abstractmethod
- def get_authorization_url(self, nexturl='/'):
- pass
- @abstractmethod
- def get_access_token_by_code(self, code):
- pass
- @abstractmethod
- def get_oauth_userinfo(self):
- pass
- def do_get(self, url, params):
- rsp = requests.get(url=url, params=params)
- return rsp.text
- def do_post(self, url, params):
- rsp = requests.post(url, params)
- return rsp.text
- def get_config(self):
- value = OAuthConfig.objects.filter(type=self.ICON_NAME)
- return value[0] if value else None
- class WBOauthManager(BaseOauthManager):
- AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
- TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
- API_URL = 'https://api.weibo.com/2/users/show.json'
- ICON_NAME = 'weibo'
- def __init__(self, access_token=None, openid=None):
- config = self.get_config()
- self.client_id = config.appkey if config else ''
- self.client_secret = config.appsecret if config else ''
- self.callback_url = config.callback_url if config else ''
- super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
- def get_authorization_url(self, nexturl='/'):
- params = {
- 'client_id': self.client_id,
- 'response_type': 'code',
- 'redirect_uri': self.callback_url + '&next_url=' + nexturl
- }
- url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
- return url
- def get_access_token_by_code(self, code):
- params = {
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': self.callback_url
- }
- rsp = self.do_post(self.TOKEN_URL, params)
- try:
- obj = json.loads(rsp)
- self.access_token = str(obj['access_token'])
- self.openid = str(obj['uid'])
- return self.get_oauth_userinfo()
- except Exception as e:
- logger.error(e)
- return None
- def get_oauth_userinfo(self):
- if not self.is_authorized:
- return None
- params = {
- 'uid': self.openid,
- 'access_token': self.access_token
- }
- rsp = self.do_get(self.API_URL, params)
- try:
- datas = json.loads(rsp)
- user = OAuthUser()
- user.matedata = rsp
- user.picture = datas['avatar_large']
- user.nikename = datas['screen_name']
- user.openid = datas['id']
- user.type = 'weibo'
- user.token = self.access_token
- if 'email' in datas and datas['email']:
- user.email = datas['email']
- return user
- except Exception as e:
- logger.error(e)
- logger.error('weibo oauth error.rsp:' + rsp)
- return None
- class GoogleOauthManager(BaseOauthManager):
- AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
- TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
- API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
- ICON_NAME = 'google'
- def __init__(self, access_token=None, openid=None):
- config = self.get_config()
- self.client_id = config.appkey if config else ''
- self.client_secret = config.appsecret if config else ''
- self.callback_url = config.callback_url if config else ''
- super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
- def get_authorization_url(self, nexturl='/'):
- params = {
- 'client_id': self.client_id,
- 'response_type': 'code',
- 'redirect_uri': self.callback_url,
- 'scope': 'openid email',
- }
- # url = self.AUTH_URL + "?" + urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
- url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
- return url
- def get_access_token_by_code(self, code):
- params = {
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': self.callback_url
- }
- rsp = self.do_post(self.TOKEN_URL, params)
- obj = json.loads(rsp)
- try:
- self.access_token = str(obj['access_token'])
- self.openid = str(obj['id_token'])
- logger.info(self.ICON_NAME + ' oauth ' + rsp)
- return self.access_token
- except Exception as e:
- logger.error(e)
- logger.error(self.ICON_NAME + ' oauth error ' + rsp)
- return None
- def get_oauth_userinfo(self):
- if not self.is_authorized:
- return None
- params = {
- 'access_token': self.access_token
- }
- rsp = self.do_get(self.API_URL, params)
- try:
- datas = json.loads(rsp)
- user = OAuthUser()
- user.matedata = rsp
- user.picture = datas['picture']
- user.nikename = datas['name']
- user.openid = datas['sub']
- user.token = self.access_token
- user.type = 'google'
- if datas['email']:
- user.email = datas['email']
- return user
- except Exception as e:
- logger.error(e)
- logger.error('google oauth error.rsp:' + rsp)
- return None
- class GitHubOauthManager(BaseOauthManager):
- AUTH_URL = 'https://github.com/login/oauth/authorize'
- TOKEN_URL = 'https://github.com/login/oauth/access_token'
- API_URL = 'https://api.github.com/user'
- ICON_NAME = 'github'
- def __init__(self, access_token=None, openid=None):
- config = self.get_config()
- self.client_id = config.appkey if config else ''
- self.client_secret = config.appsecret if config else ''
- self.callback_url = config.callback_url if config else ''
- super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
- def get_authorization_url(self, nexturl='/'):
- params = {
- 'client_id': self.client_id,
- 'response_type': 'code',
- 'redirect_uri': self.callback_url + '&next_url=' + nexturl,
- 'scope': 'user'
- }
- # url = self.AUTH_URL + "?" + urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
- url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
- return url
- def get_access_token_by_code(self, code):
- params = {
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': self.callback_url
- }
- rsp = self.do_post(self.TOKEN_URL, params)
- try:
- from urllib import parse
- r = parse.parse_qs(rsp)
- self.access_token = (r['access_token'][0])
- return self.access_token
- except Exception as e:
- logger.error(e)
- return None
- def get_oauth_userinfo(self):
- params = {
- 'access_token': self.access_token
- }
- rsp = self.do_get(self.API_URL, params)
- try:
- datas = json.loads(rsp)
- user = OAuthUser()
- user.picture = datas['avatar_url']
- user.nikename = datas['name']
- user.openid = datas['id']
- user.type = 'github'
- user.token = self.access_token
- user.matedata = rsp
- if datas['email']:
- user.email = datas['email']
- return user
- except Exception as e:
- logger.error(e)
- logger.error('github oauth error.rsp:' + rsp)
- return None
- class FaceBookOauthManager(BaseOauthManager):
- AUTH_URL = 'https://www.facebook.com/v2.10/dialog/oauth'
- TOKEN_URL = 'https://graph.facebook.com/v2.10/oauth/access_token'
- API_URL = 'https://graph.facebook.com/me'
- ICON_NAME = 'facebook'
- def __init__(self, access_token=None, openid=None):
- config = self.get_config()
- self.client_id = config.appkey if config else ''
- self.client_secret = config.appsecret if config else ''
- self.callback_url = config.callback_url if config else ''
- super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
- def get_authorization_url(self, nexturl='/'):
- params = {
- 'client_id': self.client_id,
- 'response_type': 'code',
- 'redirect_uri': self.callback_url, # + '&next_url=' + nexturl,
- 'scope': 'email,public_profile'
- }
- url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
- return url
- def get_access_token_by_code(self, code):
- params = {
- 'client_id': self.client_id,
- 'client_secret': self.client_secret,
- # 'grant_type': 'authorization_code',
- 'code': code,
- 'redirect_uri': self.callback_url
- }
- rsp = self.do_post(self.TOKEN_URL, params)
- try:
- obj = json.loads(rsp)
- token = str(obj['access_token'])
- self.access_token = token
- return self.access_token
- except Exception as e:
- logger.error(e)
- return None
- def get_oauth_userinfo(self):
- params = {
- 'access_token': self.access_token,
- 'fields': 'id,name,picture,email'
- }
- try:
- rsp = self.do_get(self.API_URL, params)
- datas = json.loads(rsp)
- user = OAuthUser()
- user.nikename = datas['name']
- user.openid = datas['id']
- user.type = 'facebook'
- user.token = self.access_token
- user.matedata = rsp
- if datas['email']:
- user.email = datas['email']
- if datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
- user.picture = str(datas['picture']['data']['url'])
- return user
- except Exception as e:
- logger.error(e)
- return None
- def get_oauth_apps():
- configs = OAuthConfig.objects.filter(is_enable=True).all()
- if not configs:
- return []
- configtypes = [x.type for x in configs]
- applications = BaseOauthManager.__subclasses__()
- apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
- return apps
- def get_manager_by_type(type):
- applications = get_oauth_apps()
- finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
- if finds:
- return finds[0]
- return None
|