crypto.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
  4. from Crypto.Cipher import AES
  5. from functools import wraps
  6. from utils.http import make_json_response
  7. from urllib import parse
  8. from django.http import JsonResponse
  9. import json
  10. IV = '16-Bytes--String'
  11. with open('public.rsa') as f:
  12. key = f.read()
  13. public_key = RSA.import_key(key)
  14. public_cipher = PKCS1_cipher.new(public_key)
  15. print(public_key.exportKey().decode(encoding='utf-8'))
  16. with open('private.rsa') as f:
  17. key = f.read()
  18. private_key = RSA.import_key(key)
  19. private_cipher = PKCS1_cipher.new(private_key)
  20. # print(private_key.exportKey().decode(encoding='utf-8'))
  21. # 安全传输decorator
  22. def secure_transport(view_func):
  23. @wraps(view_func)
  24. def _wrapped_view(request, *args, **kwargs):
  25. data = request.POST
  26. enc_key = data.get('enc_key')
  27. cipher_text = data.get('cipher_text')
  28. if not enc_key or not cipher_text:
  29. print('无加密')
  30. return view_func(request, *args, **kwargs)
  31. aes_key = private_cipher.decrypt(base64.b64decode(enc_key.encode('utf-8')), b'error').decode('utf-8')
  32. print(f'key={aes_key}')
  33. def get_aes_cipher():
  34. return AES.new(aes_key.encode('utf-8'), AES.MODE_CBC, IV.encode('utf-8'))
  35. decrypted = get_aes_cipher().decrypt(base64.b64decode(cipher_text.encode('utf-8')))
  36. # print(decrypted)
  37. decrypted = decrypted[:-decrypted[-1]]
  38. # print(decrypted)
  39. plain_text = decrypted.decode('utf-8')
  40. print(plain_text)
  41. try:
  42. loaded = json.loads(plain_text)
  43. except:
  44. print('不是json')
  45. loaded = {}
  46. plain_text = parse.unquote(plain_text)
  47. print(plain_text)
  48. for p in map(lambda s: s.split('='), plain_text.split('&')):
  49. loaded[p[0]] = p[1]
  50. dec_request = request
  51. dec_request.POST = {'key': key, **request.POST, **loaded}
  52. raw_response = view_func(dec_request, *args, **kwargs)
  53. if not isinstance(raw_response, JsonResponse):
  54. return raw_response
  55. content = json.dumps({'data': json.loads(raw_response.content)}).encode('utf-8')
  56. padding = 16 - len(content) % 16
  57. content += bytes([padding] * padding)
  58. print(content)
  59. enc_content = base64.b64encode(get_aes_cipher().encrypt(content)).decode('utf-8')
  60. print(enc_content)
  61. print(get_aes_cipher().decrypt(base64.b64decode(enc_content.encode('utf-8'))))
  62. return make_json_response(enc_content=enc_content)
  63. return _wrapped_view
  64. def test():
  65. plain_text = '{"username": "user1"}'
  66. encrypted = public_cipher.encrypt(bytes(plain_text.encode('utf8')))
  67. cipher_text = base64.b64encode(encrypted)
  68. print(cipher_text.decode('utf8'))
  69. decrypted = private_cipher.decrypt(base64.b64decode(cipher_text), b'error')
  70. print(decrypted.decode('utf8'))
  71. # print(public_key.exportKey().decode('utf-8'))
  72. # random_generator = Random.new().read
  73. # rsa = RSA.generate(2048, random_generator)
  74. # private_key = rsa.exportKey()
  75. # print(private_key.decode('utf-8'))