Simple Pyramid Example

In this tutorial we will create a simple Pyramid application that will be able to log users in with Facebook, Twitter and OpenID and retrieve their 5 most recent tweets/statuses.

You can download all the source files we are about to create here.

First create the Config dictionary where you set up all the providers you want to use. Yo will need the consumer_key and consumer_secret which you can get here for Facebook and here for Twitter.

Note

Facebook and other OAuth 2.0 providers require a redirect URI which should be the URL of the login request view which we will create in this tutorial and whose walue in our case will be http://[hostname]:[port]/login/fb for Facebook.

# config.py

from authomatic.providers import oauth2, oauth1, openid, gaeopenid

CONFIG = {
    
    'tw': { # Your internal provider name
        
        # Provider class
        'class_': oauth1.Twitter,
        
        # Twitter is an AuthorizationProvider so we need to set several other properties too:
        'consumer_key': '########################',
        'consumer_secret': '########################',
    },
    
    'fb': {
           
        'class_': oauth2.Facebook,
        
        # Facebook is an AuthorizationProvider too.
        'consumer_key': '########################',
        'consumer_secret': '########################',
        
        # But it is also an OAuth 2.0 provider and it needs scope.
        'scope': ['user_about_me', 'email', 'publish_stream'],
    },
    
    'oi': {
           
        # OpenID provider dependent on the python-openid package.
        'class_': openid.OpenID,
    }
}

Create main.py file and import what’s needed.

# main.py

from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.response import Response
from authomatic import Authomatic
from authomatic.adapters import WebObAdapter

from config import CONFIG

Make an instance of the Authomatic class and pass the Config together with a random secret string used for session and CSRF token generation to it’s constructor.

authomatic = Authomatic(config=CONFIG, secret='some random secret string')

Create a simple login view.

def login(request):

Inside the login view create a pyramid.response.Response instance which we need to pass to the WebObAdapter.

    response = Response()

Get the provider_name URL variable.

    provider_name = request.matchdict.get('provider_name')

Log the user in by calling the Authomatic.login() method inside the view. You must pass it an adapter for your framework and one of the provider names that you defined in the Config, which we get from the URL path of the request. Pyramid is based on WebOb so the WebObAdapter will do the job. The method will redirect the user to the specified provider to prompt him/her for consent and redirect him/her back to this view.

    result = authomatic.login(WebObAdapter(request, response), provider_name)

The login procedure is over when Authomatic.login() returns a LoginResult.

Warning

Do not write anything to the response unless the login procedure is over! The Authomatic.login() either returns None, which means that the login procedure si still pending, or a LoginResult which means that the login procedure is over.

    if result:
        response.write('<a href="..">Home</a>')

Hopefully there is no LoginResult.error but rather LoginResult.user. Most of the providers don’t provide user info on login. To get more user info we need to call the User.update() method.

        if result.error:
            response.write(u'<h2>Damn that error: {0}</h2>'.format(result.error.message))
        elif result.user:
            if not (result.user.name and result.user.id):
                result.user.update()

Now we can welcome the user by name.

            response.write(u'<h1>Hi {0}</h1>'.format(result.user.name))
            response.write(u'<h2>Your id is: {0}</h2>'.format(result.user.id))
            response.write(u'<h2>Your email is: {0}</h2>'.format(result.user.email))

Seems like we’re done, but we can do more:

If there are Credentials the user has logged in with an AuthorizationProvider i.e. OAuth 1.0a or OAuth 2.0 and we can access his/her protected resources.

            if result.user.credentials:

Each provider has it’s specific API. Let’s first get the user’s five most recent Facebook statuses.

                if result.provider.name == 'fb':
                    response.write('Your are logged in with Facebook.<br />')

Prepare the Facebook Graph API URL.

                    url = 'https://graph.facebook.com/{0}?fields=feed.limit(5)'
                    url = url.format(result.user.id)

Access the protected resource of the user at that URL.

                    access_response = result.provider.access(url)

Parse the response. The Response.data is a data structure (list or dictionary) parsed from the Response.content which usually is JSON.

                    if access_response.status == 200:
                        # Parse response.
                        statuses = access_response.data.get('feed').get('data')
                        error = access_response.data.get('error')
                        
                        if error:
                            response.write(u'Damn that error: {0}!'.format(error))
                        elif statuses:
                            response.write('Your 5 most recent statuses:<br />')
                            for message in statuses:
                                
                                text = message.get('message')
                                date = message.get('created_time')
                                
                                response.write(u'<h3>{0}</h3>'.format(text))
                                response.write(u'Posted on: {0}'.format(date))
                    else:
                        response.write('Damn that unknown error!<br />')
                        response.write(u'Status: {0}'.format(response.status))

Do the same with Twitter.

                if result.provider.name == 'tw':
                    response.write('Your are logged in with Twitter.<br />')
                    
                    # We will get the user's 5 most recent tweets.
                    url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
                    
                    # You can pass a dictionary of querystring parameters.
                    access_response = result.provider.access(url, {'count': 5})
                                            
                    # Parse response.
                    if access_response.status == 200:
                        if type(access_response.data) is list:
                            # Twitter returns the tweets as a JSON list.
                            response.write('Your 5 most recent tweets:')
                            for tweet in access_response.data:
                                text = tweet.get('text')
                                date = tweet.get('created_at')
                                
                                response.write(u'<h3>{0}</h3>'.format(text.replace(u'\u2026', '...')))
                                response.write(u'Tweeted on: {0}'.format(date))
                                
                        elif access_response.data.get('errors'):
                            response.write(u'Damn that error: {0}!'.\
                                                format(response.data.get('errors')))
                    else:
                        response.write('Damn that unknown error!<br />')
                        response.write(u'Status: {0}'.format(response.status))

Now this is VERY IMPORTANT! The login view MUST return the same response instance which we passed to the WebObAdapter. Otherwise it just won’t work.

    return response

That’s it for the login view.

Now just for convenience that we don’t have to enter all the URLs manually let’s create a Home view with an OpenID form and links to the login view.

def home(request):
    return Response('''
        Login with <a href="login/fb">Facebook</a>.<br />
        Login with <a href="login/tw">Twitter</a>.<br />
        <form action="login/oi">
            <input type="text" name="id" value="me.yahoo.com" />
            <input type="submit" value="Authenticate With OpenID">
        </form>
    ''')

Last bit is to configure the application.

if __name__ == '__main__':
    config = Configurator()
    
    config.add_route('home', '/')
    config.add_view(home, route_name='home')
    
    config.add_route('login', '/login/{provider_name}')
    config.add_view(login, route_name='login')
    
    app = config.make_wsgi_app()
    server = make_server('127.0.0.1', 8080, app)
    server.serve_forever()

That’s it. Now just run the application and navigate your browser to http://127.0.0.1:8080 or http://localhost:8080.

$ python main.py

Tip

Some of the providers don’t support authorization from apps running on localhost. Probably the best solution is to use an arbitrary domain as an alias of the 127.0.0.1 IP address.

You can do this on UNIX systems by adding an alias to the /etc/hosts file.

    # /etc/hosts
    127.0.0.1    localhost
    127.0.0.1    yourlocalhostalias.com

And here is the complete app. Remember that you can download all the files we just created from GitHub.

# -*- coding: utf-8 -*-
# main.py

from wsgiref.simple_server import make_server
from pyramid.config import Configurator
from pyramid.response import Response
from authomatic import Authomatic
from authomatic.adapters import WebObAdapter

from config import CONFIG

authomatic = Authomatic(config=CONFIG, secret='some random secret string')


def login(request):
    
    # We will need the response to pass it to the WebObAdapter.
    response = Response()
    
    # Get the internal provider name URL variable.
    provider_name = request.matchdict.get('provider_name')
    
    # Start the login procedure.
    result = authomatic.login(WebObAdapter(request, response), provider_name)
    
    # Do not write anything to the response if there is no result!
    if result:
        # If there is result, the login procedure is over and we can write to response.
        response.write('<a href="..">Home</a>')
        
        if result.error:
            # Login procedure finished with an error.
            response.write(u'<h2>Damn that error: {0}</h2>'.format(result.error.message))
        
        elif result.user:
            # Hooray, we have the user!
            
            # OAuth 2.0 and OAuth 1.0a provide only limited user data on login,
            # We need to update the user to get more info.
            if not (result.user.name and result.user.id):
                result.user.update()
            
            # Welcome the user.
            response.write(u'<h1>Hi {0}</h1>'.format(result.user.name))
            response.write(u'<h2>Your id is: {0}</h2>'.format(result.user.id))
            response.write(u'<h2>Your email is: {0}</h2>'.format(result.user.email))
            
            # Seems like we're done, but there's more we can do...
            
            # If there are credentials (only by AuthorizationProvider),
            # we can _access user's protected resources.
            if result.user.credentials:
                
                # Each provider has it's specific API.
                if result.provider.name == 'fb':
                    response.write('Your are logged in with Facebook.<br />')
                    
                    # We will access the user's 5 most recent statuses.
                    url = 'https://graph.facebook.com/{0}?fields=feed.limit(5)'
                    url = url.format(result.user.id)
                    
                    # Access user's protected resource.
                    access_response = result.provider.access(url)
                    
                    if access_response.status == 200:
                        # Parse response.
                        statuses = access_response.data.get('feed').get('data')
                        error = access_response.data.get('error')
                        
                        if error:
                            response.write(u'Damn that error: {0}!'.format(error))
                        elif statuses:
                            response.write('Your 5 most recent statuses:<br />')
                            for message in statuses:
                                
                                text = message.get('message')
                                date = message.get('created_time')
                                
                                response.write(u'<h3>{0}</h3>'.format(text))
                                response.write(u'Posted on: {0}'.format(date))
                    else:
                        response.write('Damn that unknown error!<br />')
                        response.write(u'Status: {0}'.format(response.status))
                    
                if result.provider.name == 'tw':
                    response.write('Your are logged in with Twitter.<br />')
                    
                    # We will get the user's 5 most recent tweets.
                    url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
                    
                    # You can pass a dictionary of querystring parameters.
                    access_response = result.provider.access(url, {'count': 5})
                                            
                    # Parse response.
                    if access_response.status == 200:
                        if type(access_response.data) is list:
                            # Twitter returns the tweets as a JSON list.
                            response.write('Your 5 most recent tweets:')
                            for tweet in access_response.data:
                                text = tweet.get('text')
                                date = tweet.get('created_at')
                                
                                response.write(u'<h3>{0}</h3>'.format(text.replace(u'\u2026', '...')))
                                response.write(u'Tweeted on: {0}'.format(date))
                                
                        elif access_response.data.get('errors'):
                            response.write(u'Damn that error: {0}!'.\
                                                format(response.data.get('errors')))
                    else:
                        response.write('Damn that unknown error!<br />')
                        response.write(u'Status: {0}'.format(response.status))
    
    # It won't work if you don't return the response
    return response


def home(request):
    return Response('''
        Login with <a href="login/fb">Facebook</a>.<br />
        Login with <a href="login/tw">Twitter</a>.<br />
        <form action="login/oi">
            <input type="text" name="id" value="me.yahoo.com" />
            <input type="submit" value="Authenticate With OpenID">
        </form>
    ''')


if __name__ == '__main__':
    config = Configurator()
    
    config.add_route('home', '/')
    config.add_view(home, route_name='home')
    
    config.add_route('login', '/login/{provider_name}')
    config.add_view(login, route_name='login')
    
    app = config.make_wsgi_app()
    server = make_server('127.0.0.1', 8080, app)
    server.serve_forever()
Fork me on GitHub