What You’ll Learn

What You’ll Need

In web applications, it is common to store and maintain persistent data in a database, allowing the data to be retrieved and manipulated efficiently. In addition to adding data to a database, you can retrieve, modify, or delete it.

The Flask web framework provides tools and features for creating web applications in Python. The Python standard library includes the sqlite3 module, which allows you to interact with an SQLite database without the need to install additional tools.

Requests is an elegant and simple HTTP library for Python that allows you to submit an HTTP request to a website and receive a response in return. It can be used with several request types, including GET, POST, PUT, PATCH, and DELETE. It is a popular choice when working with Representational State Transfer (REST) application programming interfaces (APIs).

In this tutorial, you will integrate an SQLite database into a simple application within the Flask web framework. You will use the route() decorator to create Flask view functions that turn regular Python functions into HTTP responses. A value passed to the route() decorator will designate which URL each view function responds to. These view functions can be thought of as API endpoints that can be consumed using a REST client. You will perform basic data manipulation with CRUD (create, read, update, and delete) techniques, using Python Requests as your REST client.

Although the tutorial makes use of an SQLite database and Flask web application, its primary focus is REST API interaction using Python Requests.

img.png

In this step, you are going to initialize the SQLite database, then verify its contents using sqlite3.

Create the Schema

Create a file named db_schema.sql in a folder in your Python 3.6+ environment, and add the following code:

DROP TABLE IF EXISTS records;

CREATE TABLE records (
    id INTEGER PRIMARY KEY,
    fname TEXT NOT NULL,
    lname TEXT NOT NULL,
    role TEXT NOT NULL
);

Create the Database

Create a script called new_db.py in the same folder, and add the following code:

import sqlite3

con = sqlite3.connect('employees.db')

with open('db_schema.sql') as f:
    con.executescript(f.read())

cur = con.cursor()

cur.execute("INSERT INTO records (fname, lname, role) VALUES (?, ?, ?)",
            ('Aisha', 'Kumar', 'Product Manager')
            )

cur.execute("INSERT INTO records (fname, lname, role) VALUES (?, ?, ?)",
            ('Qian', 'Chen', 'Cloud Engineer')
            )

cur.execute("INSERT INTO records (fname, lname, role) VALUES (?, ?, ?)",
            ('Kamali', 'Keita', 'Data Analyst')
            )

con.commit()
con.close()

A connection is established to the employees.db database using the sqlite3 connect() method. The db_schema.sql file is opened and its contents are executed using the connection object executescript() method, which creates the records table. The connection object requires a cursor to execute Structured Query Language (SQL) statements. SQL statements are executed using the cursor’s execute() method, which inserts some records into the records table.

Open an OS command prompt and run the script:

python new_db.py

When running the script, you may have to type python3 or something similar, depending on your environment setup. For simplicity, this tutorial will continue to refer to python when running scripts. Once you have run the script, verify that the database file employees.db has been created in the same folder.

Query the Database

Next, you are going to examine the contents of the database using sqlite3.

Start a Python Read Evaluate Print Loop (REPL) session from the same folder that you created the database in, and enter the following code:

import sqlite3

con = sqlite3.connect('employees.db')
cur = con.cursor()
records = cur.execute('select * from records').fetchall()

for record in records:
    print(f'Record {record[0]}\n\tFirst name: {record[1]}\n\tLast name: {record[2]}\n\tRole: {record[3]}\n')

The output should look like this:

img.png

In this step, you are going to create a simple web application and verify that it works.

Create a script called app.py, and add the following code:

import sqlite3
from flask import Flask, jsonify, request

app = Flask(__name__)


@app.route('/')
def welcome():
    return '<h1>Welcome to the Employee Records database</h1>'


if __name__== "__main__":
    app.run(debug=True, port=8000)

In this script, you import the Flask object from the flask package, then use it to create a flask application with the name app. The value / is passed to the @app.route() decorator to signify that this function will respond to web requests for the URL /. The welcome() view function returns the string Welcome to the Employee Records database. The run() method starts the application when the script is run.

Open an OS command prompt and run the script.

python app.py

The application runs on a simple, built-in web server that is sufficient for testing but not suitable for use in production. The server listens on port 5000 by default, although it has been changed to port 8000 in this tutorial. You may need to change it if port 8000 is already in use in your environment. The server output looks like this:

img.png

The web server only handles requests for a single resource at present. Open a browser and point it to http://localhost:8000 to view the resource. It should look like this:

img.png

To stop the server, press CTRL+C at the prompt where you started it.

In this step, you are going to add some routes to your application. For each route you add, you will consume the API endpoint that it provides, using Python Requests.

First, edit app.py, and insert the following code directly below app = Flask(\_\_name\_\_):

def db_connect():
    con = sqlite3.connect('employees.db')
    return con

Then, insert a new route directly below the existing route:

@app.route('/employee', methods=['GET'])
def get_emps():
    con = db_connect()
    cur = con.cursor()
    rows = cur.execute('select * from records').fetchall()
    con.close()
    return jsonify(rows)

The contents of app.py should look like this:

img.png

The cursor (cur) executes an SQL statement to retrieve all the data from the records table, and the jsonify() function converts the output of the get_emps() view function to a JSON response object.

The server automatically restarts after saving changes to app.py, which is a benefit of running it in debug mode. Alternatively, you can manually stop the server by pressing CTRL-C, then restart it by running the app.py script again.

Type in the following commands from a Python REPL session:

import requests

url = 'http://localhost:8000/employee'
r = requests.get(url)
records = r.json()

for record in records:
    print(f'Record {record[0]}\n\tFirst name: {record[1]}\n\t Last name: {record[2]}\n\t Role: {record[3]}\n')

This route returns all the records. The server output displays the time and date that the GET request took place:

img.png

One way to retrieve a single record is to modify the SQL statement. Insert the following code after the previous route:

@app.route('/employee/<empid>', methods=['GET'])
def get_emp(empid):
    con = db_connect()
    cur = con.cursor()
    row = cur.execute('select * from records WHERE id = ?', empid).fetchone()
    con.close()
    return jsonify(row)

Type in the following commands from the Python REPL:

import requests

url = 'http://localhost:8000/employee/1'
r = requests.get(url)
record = r.json()

print(f"Record {record[0]}\n\tFirst name: {record[1]}\n\t Last name: {record[2]}\n\t Role: {record[3]}\n")

Next, you are going to add a new record. Insert the following code after the previous route:

@app.route('/employee/', methods=['POST'])
def add_emp():
    con = db_connect()
    cur = con.cursor()
    new_emp = request.get_json()
    id = new_emp['id']
    fname = new_emp['fname']
    lname = new_emp['lname']
    role = new_emp['role']
    cur.execute('INSERT INTO records (id, fname, lname, role) VALUES (?, ?, ?, ?)', (id, fname, lname, role))
    con.commit()
    con.close()
    return new_emp

To access the incoming data in Flask, you have to use the request object. Once you have imported the request object from the Flask library (from flask import request), you can use it in any of your view functions. The request.get_json() method converts the incoming JSON data into a Python dictionary.

Type in the following Python commands to create a new record:

import json
import requests

url = 'http://localhost:8000/employee/'
data = json.dumps({'id': 4, 'fname': 'Sophia', 'lname': 'Moreau', 'role': 'CFO'})
headers = {'content-type': 'application/json'}
r = requests.post(url, data=data, headers=headers)
record = r.json()

print(f"Record {record['id']}\n\tFirst name: {record['fname']}\n\t Last name: {record['lname']}\n\t Role: {record['role']}\n")

The json.dumps() method converts the Python dictionary into a JSON string, and the requests data keyword argument is used to submit the JSON-encoded data.

Next, you are going to modify this record. Insert the following code after the previous route:

@app.route('/employee/<empid>', methods=['PUT', 'PATCH'])
def mod_emp(empid):
    con = db_connect()
    cur = con.cursor()
    emp = cur.execute('SELECT * FROM records WHERE id = ?', (empid,)).fetchone()
    fname = emp[1]
    lname = emp[2]
    role = emp[3]
    update_emp = request.get_json()
    if 'fname' in update_emp:
        fname = update_emp['fname']
    if 'lname' in update_emp:
        lname = update_emp['lname']
    if 'role' in update_emp:
        role = update_emp['role']

    cur.execute('UPDATE records SET fname = ?, lname = ?, role = ?'' WHERE id = ?', (fname, lname, role, empid))
    con.commit()
    con.close()
    return f'Record {empid} was successfully updated!'

Type in the following Python commands to modify the record:

import requests

url = 'http://localhost:8000/employee/4'
data = json.dumps({'role': 'CEO'})
headers = {'content-type': 'application/json'}
r = requests.put(url, data=data, headers=headers)
print(r.text)
r = requests.get(url)
record = r.json()

print(f'\n Updated record {record[0]}\n\tFirst name: {record[1]}\n\t Last name: {record[2]}\n\t Role: {record[3]}\n')

Next, you are going to update the same record using an alternative method. Change the data to read data = json.dumps({'fname': 'Lisette'}). Repeat the commands above, but replace put with patch (r = requests.patch(url, data=data, headers=headers). Verify that the record is updated.

Now, you are going to delete the newly updated record. Insert the following code after the previous route:

@app.route('/employee/<empid>', methods=['DELETE'])
def del_emp(empid):
    con = db_connect()
    cur = con.cursor()
    cur.execute('DELETE FROM records WHERE id = ?', (empid,))
    con.commit()
    con.close()
    return f'Record {empid} was successfully deleted!'

Type in the following Python commands to delete the record:

import requests

url = 'http://localhost:8000/employee/4'
r = requests.delete(url)
print(r.text)
url = 'http://localhost:8000/employee'
r = requests.get(url)
records = r.json()

print('\nHere are the remaining records:\n')
for record in records:
    print(f'Record {record[0]}\n\tFirst name: {record[1]}\n\t Last name: {record[2]}\n\t Role: {record[3]}\n')

If you want to check which request type(s) a resource supports, you can use the options method. Type in the following Python commands:

import requests

url = "http://localhost:8000/employee"
r = requests.options(url)
options = r.headers['allow']
print(f'The supported options for {url} are: {options}.')

url = "http://localhost:8000/employee/"
r = requests.options(url)
options = r.headers['allow']
print(f'The supported options for {url} are: {options}.')

url = "http://localhost:8000/employee/1"
r = requests.options(url)
options = r.headers['allow']
print(f'The supported options for {url} are: {options}.')

The supported options may not be the same for different resources. For example, you must use the URL http://localhost:8000/employee/ to create a record for this application. You will not be able to create, update, or delete records when using the URL http://localhost:8000/employee/, although you will be able to read them.

Finally, we are going to perform some searches.

Insert the following code after the previous route:

@app.route('/employee/find', methods=['GET'])
def find_emp():
    con = db_connect()
    cur = con.cursor()
    if request.args.get('id'):
        emp_id = request.args.get('id')
        row = cur.execute('select * from records WHERE id = ?', (emp_id,)).fetchall()
    elif request.args.get('fname'):
        emp_fname = request.args.get('fname')
        row = cur.execute('select * from records WHERE fname = ?', (emp_fname,)).fetchall()
    elif request.args.get('lname'):
        emp_lname = request.args.get('lname')
        row = cur.execute('select * from records WHERE lname = ?', (emp_lname,)).fetchall()
    elif request.args.get('role'):
        emp_role = request.args.get('role')
        row = cur.execute('select * from records WHERE role = ?', (emp_role,)).fetchall()
    con.close()
    return jsonify(row)

When performing a search, it is common to add some data to the URL query string. This data would be given as one or more key-value pairs appended to the URL after a question mark—for example, https://localhost:8000/employee/find?fname=Aisha. The key-value pairs can be thought of as arguments, and the Flask request object can access them by calling args.get(). For example, request.args.get('fname') can be used to access the value assigned to the fname argument.

Type in the following Python commands to search the records:

import requests

base_url = "http://localhost:8000/employee/find"
search_url = '?fname=Aisha'
url = base_url + search_url
r = requests.get(url)
record_1 = r.json()

print('The record you searched for is:\n')
print(f'Record {record_1[0][0]}\n\tFirst name: {record_1[0][1]}\n\t Last name: {record_1[0][2]}\n\t Role: {record_1[0][3]}\n')

search_url = '?fname=Qian&lname=Chen'
url = base_url + search_url
r = requests.get(url)
record_2 = r.json()

print('The record you searched for is:\n')
print(f'Record {record_2[0][0]}\n\tFirst name: {record_2[0][1]}\n\t Last name: {record_2[0][2]}\n\t Role: {record_2[0][3]}\n')

Although multiple arguments can be combined by separating each one with an ampersand, the URL query string can become quite long and difficult to configure. Requests provides a simpler way of adding the arguments as a dictionary of strings, using the params keyword argument.

Type the following code to retrieve a record, using multiple arguments and the params keyword argument:

import requests

base_url = "http://localhost:8000/employee/find"
params = {'fname': 'Kamali', 'lname': 'Keita'}
r = requests.get(url=base_url, params=params)
record = r.json()
url = r.url
print('The record you searched for is:\n')
print(f'Record {record[0][0]}\n\tFirst name: {record[0][1]}\n\t Last name: {record[0][2]}\n\t Role: {record[0][3]}\n')
print(f'The submitted url is: {url}')

Even though the arguments were added as a dictionary of strings, the URL has been encoded correctly.

The entire app.py script is here for your reference. (Please refer to step 3 for the database setup script.)

import sqlite3
from flask import Flask, jsonify, request

app = Flask(__name__)


def db_connect():
    con = sqlite3.connect('employees.db')
    return con


@app.route('/')
def welcome():
    return '<h1>Welcome to the Employee Records database</h1>'


@app.route('/employee', methods=['GET'])
def get_emps():
    con = db_connect()
    cur = con.cursor()
    rows = cur.execute('select * from records').fetchall()
    con.close()
    return jsonify(rows)


@app.route('/employee/<empid>', methods=['GET'])
def get_emp(empid):
    con = db_connect()
    cur = con.cursor()
    row = cur.execute('select * from records WHERE id = ?', empid).fetchone()
    con.close()
    return jsonify(row)


@app.route('/employee/', methods=['POST'])
def add_emp():
    con = db_connect()
    cur = con.cursor()
    new_emp = request.get_json()
    id = new_emp['id']
    fname = new_emp['fname']
    lname = new_emp['lname']
    role = new_emp['role']
    cur.execute('INSERT INTO records (id, fname, lname, role) VALUES (?, ?, ?, ?)', (id, fname, lname, role))
    con.commit()
    con.close()
    return new_emp


@app.route('/employee/<empid>', methods=['PUT', 'PATCH'])
def mod_emp(empid):
    con = db_connect()
    cur = con.cursor()
    emp = cur.execute('SELECT * FROM records WHERE id = ?', (empid,)).fetchone()
    fname = emp[1]
    lname = emp[2]
    role = emp[3]
    update_emp = request.get_json()
    if 'fname' in update_emp:
        fname = update_emp['fname']
    if 'lname' in update_emp:
        lname = update_emp['lname']
    if 'role' in update_emp:
        role = update_emp['role']

    cur.execute('UPDATE records SET fname = ?, lname = ?, role = ?'' WHERE id = ?', (fname, lname, role, empid))
    con.commit()
    con.close()
    return f'Record {empid} was successfully updated!'


@app.route('/employee/<empid>', methods=['DELETE'])
def del_emp(empid):
    con = db_connect()
    cur = con.cursor()
    cur.execute('DELETE FROM records WHERE id = ?', (empid,))
    con.commit()
    con.close()
    return f'Record {empid} was successfully deleted!'


@app.route('/employee/find', methods=['GET'])
def find_emp():
    con = db_connect()
    cur = con.cursor()
    if request.args.get('id'):
        emp_id = request.args.get('id')
        row = cur.execute('select * from records WHERE id = ?', (emp_id,)).fetchall()
    elif request.args.get('fname'):
        emp_fname = request.args.get('fname')
        row = cur.execute('select * from records WHERE fname = ?', (emp_fname,)).fetchall()
    elif request.args.get('lname'):
        emp_lname = request.args.get('lname')
        row = cur.execute('select * from records WHERE lname = ?', (emp_lname,)).fetchall()
    elif request.args.get('role'):
        emp_role = request.args.get('role')
        row = cur.execute('select * from records WHERE role = ?', (emp_role,)).fetchall()
    con.close()
    return jsonify(row)


if __name__ == "__main__":
    app.run(debug=True, port=8000)

Congratulations! You have successfully completed this tutorial.

You learned:

Learn More

Continue your Education