The full repo is here, but I'll post the most important extracts below.
Background
I'm trying to get the hang of blockchain and public key cryptography, so I thought it would be fun to create a program for drawing up and promulgating government decrees - the government in question being one that I've just made up.
The system is intended to work like this:
- A government official in possession of the necessary passwords draws up a decree, stamps it with a digital stamp, and then adds it the register.
- A private citizen then encounters decrees while going about his business:
- If he encounters a decree in isolation, he can judge its authenticity using its stamp and the public key.
- If he has access to the register, he can check that the government hasn't tried to hide an earlier decree by checking the chain of hashes.
The Code
This is the code in which I make and verify my stamps, the certificates which authenticate decrees:
### This code defines two classes: one of which produces a digital stamp for
### documents issued by the Chancellor of Cyprus, and the other of which
### verfies the same.
# Imports.
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from pathlib import Path
import datetime, getpass, os
# Local constants.
path_to_digistamp = str(Path.home())+"/chancery-b/digistamp/"
path_to_private_key = path_to_digistamp+"stamp_private_key.pem"
path_to_public_key = path_to_digistamp+"stamp_public_key.pem"
public_exponent = 65537
key_size = 2048
encoding = "utf-8"
################
# MAIN CLASSES #
################
# A class which produces a string which testifies as to the authenticity of
# a given document.
class Stamp_Machine:
def __init__(self, data):
if os.path.exists(path_to_private_key) == False:
raise Exception("No private key on disk.")
self.private_key = load_private_key()
self.data = data
# Ronseal.
def make_stamp(self):
data_bytes = bytes(self.data, encoding)
result_bytes = self.private_key.sign(
data_bytes,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
result = result_bytes.hex()
return result
# A class which allows the user to verify a stamp produced as above.
class Verifier:
def __init__(self, stamp, data):
if isinstance(stamp, str) == False:
raise Exception("")
self.stamp_str = stamp
self.stamp_bytes = bytes.fromhex(stamp)
self.public_key = load_public_key()
self.data = data
# Decide whether the stamp in question is authentic or not.
def verify(self):
data_bytes = bytes(self.data, encoding)
try:
self.public_key.verify(
self.stamp_bytes,
data_bytes,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256())
except InvalidSignature:
return False
else:
return True
####################
# HELPER FUNCTIONS #
####################
# Get a password from the user, and convert it into bytes.
def get_bytes_password():
password = getpass.getpass(prompt="Digistamp password: ")
result = bytes(password, encoding)
return result
# Get a NEW password from the user, and convert it into bytes.
def get_bytes_password_new():
password = getpass.getpass(prompt="Digistamp password: ")
password_ = getpass.getpass(prompt="Confirm password: ")
if password != password_:
raise Exception("Passwords do not match.")
result = bytes(password, encoding)
return result
# Generate a public key from a private key object.
def generate_public_key(private_key):
public_key = private_key.public_key()
pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo)
f = open(path_to_public_key, "wb")
f.write(pem)
f.close()
# Generate a new private and public key.
def generate_keys():
if os.path.exists(path_to_private_key):
raise Exception("Private key file already exists.")
bpw = get_bytes_password_new()
private_key = rsa.generate_private_key(public_exponent=public_exponent,
key_size=key_size,
backend=default_backend())
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=
serialization.BestAvailableEncryption(bpw))
f = open(path_to_private_key, "wb")
f.write(pem)
f.close()
generate_public_key(private_key)
# Load the private key from its file.
def load_private_key():
bpw = get_bytes_password()
key_file = open(path_to_private_key, "rb")
result = serialization.load_pem_private_key(key_file.read(),
password=bpw,
backend=default_backend())
return result
# Load the public key from its file.
def load_public_key():
key_file = open(path_to_public_key, "rb")
result = serialization.load_pem_public_key(key_file.read(),
backend=default_backend())
return result
###########
# TESTING #
###########
# Run the unit tests.
def test():
stamp = Stamp_Machine("123").make_stamp()
assert(Verifier(stamp, "123").verify())
assert(Verifier(stamp, "abc").verify() == False)
print("Tests passed!")
###################
# RUN AND WRAP UP #
###################
def run():
test()
if __name__ == "__main__":
run()
This is the code in which a decree-in-the-making is uploaded to the register:
### This code defines a class, which uploads a record to the ledger.
# Imports.
import datetime, hashlib, os, sqlite3
# Local imports.
from digistamp.digistamp import Stamp_Machine
import ordinance_inputs
# Constants.
encoding = "utf-8"
##############
# MAIN CLASS #
##############
# The class in question.
class Uploader:
def __init__(self):
self.connection = None
self.c = None
self.block = Block_of_Ledger()
# Ronseal.
def make_connection(self):
self.connection = sqlite3.connect("ledger.db")
self.connection.row_factory = dict_factory
self.c = self.connection.cursor()
# Ronseal.
def close_connection(self):
self.connection.close()
# Add the ordinal and the previous block's hash to the block.
def add_ordinal_and_prev(self):
self.make_connection()
query = "SELECT * FROM Block ORDER BY ordinal DESC;"
self.c.execute(query)
result = self.c.fetchone()
self.close_connection()
if result is None:
self.block.set_ordinal(1)
self.block.set_prev("genesis")
else:
self.block.set_ordinal(result["ordinal"]+1)
self.block.set_prev(result["hash"])
# Add the hash to the present block.
def add_hash(self):
m = hashlib.sha256()
m.update(bytes(self.block.ordinal))
m.update(bytes(self.block.ordinance_type, encoding))
m.update(bytes(self.block.latex, encoding))
m.update(bytes(self.block.year))
m.update(bytes(self.block.month))
m.update(bytes(self.block.day))
if self.block.annexe:
m.update(self.block.annexe)
m.update(bytes(self.block.prev, encoding))
self.block.set_the_hash(m.hexdigest())
# Add a new block to the legder.
def add_new_block(self):
new_block_tuple = (self.block.ordinal, self.block.ordinance_type,
self.block.latex, self.block.year,
self.block.month, self.block.day,
self.block.stamp, self.block.annexe,
self.block.prev, self.block.the_hash)
query = ("INSERT INTO Block (ordinal, ordinanceType, latex, year, "+
" month, day, stamp, annexe, prev, "+
" hash) "+
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);")
self.make_connection()
self.c.execute(query, new_block_tuple)
self.connection.commit()
self.close_connection()
# Construct a new block and add it to the chain.
def upload(self):
self.add_ordinal_and_prev()
self.add_hash()
self.add_new_block()
################################
# HELPER CLASSES AND FUNCTIONS #
################################
# A class to hold the properties of a block of the ledger.
class Block_of_Ledger:
def __init__(self):
dt = datetime.datetime.now()
self.ordinal = None
self.ordinance_type = ordinance_inputs.ordinance_type
self.latex = ordinance_inputs.latex
self.year = dt.year
self.month = dt.month
self.day = dt.day
self.annexe = annexe_to_bytes()
self.prev = None
self.the_hash = None
self.stamp = None
# Assign a value to the "ordinal" field of this object.
def set_ordinal(self, ordinal):
self.ordinal = ordinal
# Assign a value to the "prev" field of this object.
def set_prev(self, prev):
self.prev = prev
# Assign a value to the "the_hash" field of this object.
def set_the_hash(self, the_hash):
self.the_hash = the_hash
self.stamp = Stamp_Machine(self.the_hash).make_stamp()
# A function which allows queries to return dictionaries, rather than the
# default tuples.
def dict_factory(cursor, row):
d = {}
for idx, col in enumerate(cursor.description):
d[col[0]] = row[idx]
return d
# Convert the annexe folder to a zip, load the bytes thereof into memory,
# and then delete the zip.
def annexe_to_bytes():
if len(os.listdir("annexe/")) == 0:
return None
os.system("zip -r annexe.zip annexe/")
f = open("annexe.zip", "rb")
result = f.read()
f.close()
os.system("rm annexe.zip")
return result
###################
# RUN AND WRAP UP #
###################
def run():
Uploader().upload()
if __name__ == "__main__":
run()
And this is the code in which decrees are extracted from the register and put into something more human-readable, i.e. a PDF:
### This code defines a class which takes a given record in the ledger and
### converts it into a directory.
# Imports.
from uploader import dict_factory
import os, sqlite3, sys
# Local imports.
from digistamp.digistamp import Verifier
# Constants.
month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug",
"Sep", "Oct", "Nov", "Dec"]
##############
# MAIN CLASS #
##############
# The class in question.
class Extractor:
def __init__(self, ordinal):
self.ordinal = ordinal
self.block = self.fetch_block()
self.main_tex = self.make_main_tex()
# Fetch the block matching this object's ordinal from the ledger.
def fetch_block(self):
connection = sqlite3.connect("ledger.db")
connection.row_factory = dict_factory
c = connection.cursor()
query = "SELECT * FROM Block WHERE ordinal = ?;"
c.execute(query, (self.ordinal,))
result = c.fetchone()
connection.close()
if result is None:
raise Exception("No block with ordinal "+str(self.ordinal)+
" in the ledger.")
return result
# Get the base for main.tex, given the type of the Ordinance.
def get_base(self):
if self.block["ordinanceType"] == "Decleration":
path_to_base = "latexery/base_decleration.tex"
elif self.block["ordinanceType"] == "Order":
path_to_base = "latexery/base_order.tex"
else:
raise Exception("Invalid ordinanceType: "+
self.block["ordinanceType"])
f = open(path_to_base, "r")
result = f.read()
f.close()
return result
# Make the code for main.tex, which will then be used build our PDF.
def make_main_tex(self):
day_str = str(self.block["day"])
if len(day_str) == 0:
day_str = "0"+day_str
packed_ordinal = str(self.ordinal)
while len(packed_ordinal) < 3:
packed_ordinal = "0"+packed_ordinal
month_str = month_names[self.block["month"]-1]
result = self.get_base()
result = result.replace("#BODY", self.block["latex"])
result = result.replace("#DAY_STR", day_str)
result = result.replace("#MONTH_STR", month_str)
result = result.replace("#YEAR", str(self.block["year"]))
result = result.replace("#PACKED_ORDINAL", packed_ordinal)
result = result.replace("#DIGISTAMP", self.block["stamp"])
return result
# Check that the block isn't a forgery.
def authenticate(self):
self.compare_hashes()
self.verify_stamp()
# Compare the "prev" field of this block with the hash of the previous.
def compare_hashes(self):
if self.ordinal == 1:
if self.block["prev"] != "genesis":
raise Exception("Block with ordinal=1 should be the "+
"genesis block.")
else:
return
prev_ordinal = self.ordinal-1
connection = sqlite3.connect("ledger.db")
c = connection.cursor()
query = "SELECT hash FROM Block WHERE ordinal = ?;"
c.execute(query, (prev_ordinal,))
extract = c.fetchone()
connection.close()
prev_hash = extract["0"]
if prev_hash != self.block["prev"]:
raise Exception("Block with ordinal="+str(self.ordinal)+" is "+
"not authentic: \"prev\" does not match "+
"previous \"hash\".")
# Check that this block's stamp is in order.
def verify_stamp(self):
v = Verifier(self.block["stamp"], self.block["hash"])
if v.verify() == False:
raise Exception("Block with ordinal="+str(self.ordinal)+" is "+
"not authentic: \"prev\" does not match "+
"previous \"hash\".")
# Ronseal.
def write_main_tex(self):
f = open("latexery/main.tex", "w")
f.write(self.main_tex)
f.close()
# Compile the PDF.
def compile_main_tex(self):
script = ("cd latexery/\n"+
"pdflatex main.tex")
os.system(script)
# Create the directory, and copy the PDF into it.
def create_and_copy(self):
script1 = ("cd extracts/\n"+
"mkdir "+str(self.ordinal)+"/")
script2 = "cp latexery/main.pdf extracts/"+str(self.ordinal)+"/"
if os.path.isdir("extracts/"+str(self.ordinal)+"/"):
os.system("rm -r extracts/"+str(self.ordinal)+"/")
os.system(script1)
os.system(script2)
# Write annexe to a file in the directory.
def write_annexe_zip(self):
if self.block["annexe"] is None:
return
f = open("extracts/"+str(self.ordinal)+"/annexe.zip", "wb")
f.write(self.block["annexe"])
f.close()
# Do the thing.
def extract(self):
self.authenticate()
self.write_main_tex()
self.compile_main_tex()
self.create_and_copy()
self.write_annexe_zip()
# Ronseal.
def zip_and_delete(self):
script = ("cd extracts/\n"+
"zip -r ordinance_"+str(self.ordinal)+".zip "+
str(self.ordinal)+"/\n"+
"rm -r "+str(self.ordinal)+"/")
if os.path.exists("extracts/ordinance_"+str(self.ordinal)+".zip"):
os.system("rm extracts/ordinance_"+str(self.ordinal)+".zip")
os.system(script)
###########
# TESTING #
###########
# Run a demonstration.
def demo():
e = Extractor(1)
e.extract()
#e.zip_and_delete()
###################
# RUN AND WRAP UP #
###################
def run():
if len(sys.argv) == 2:
e = Extractor(int(sys.argv[1]))
e.extract()
else:
print("Please run me with exactly one argument, the number of the "+
"Ordinance you wish to extract.")
if __name__ == "__main__":
run()
What I'd Like to Know
- Am I using blockchain and public key cryptography in a sane fashion? Or have I misunderstood some of the fundamental concepts involved?
- Are there any glaring security issues? Is there any obvious exploit in which someone could start manufacturing convincing forgeries?
- How's my Python?