Executando verificação de segurança...
3

Obrigado por compartilhar esse assunto ,só acho que poderia aprofundar mais o exemplo talvez com o cliente inteiro , pois nem todos usam o jitter para aleatorizar um valor, é uma boa opção.

Carregando publicação patrocinada...
1

API

from flask import Flask, request, jsonify
import random

app = Flask(__name__)

# Armazenamento em memória para simular um banco de dados
data_store = {}


@app.route("/resource", methods=["POST"])
def create_resource():
    resource_id = request.json.get("idempotencyKey")
    value = request.json.get("value")

    # adicionando um erro em 50% das chamadas, para testar o exponential backoff
    if random.randint(0,100) <= 50:
        return jsonify("fail"), 500
        
    # Operação idempotente: Se o recurso já existe, retorna-o
    if resource_id in data_store:
        return jsonify(data_store[resource_id]), 200

    # Caso contrário, cria o novo recurso
    data_store[resource_id] = {"idempotencyKey": resource_id, "value": value}
    return jsonify(data_store[resource_id]), 201


if __name__ == "__main__":
    app.run(debug=True)

Consumidor que irá chamar a API

import time
import requests
import random

def test_idempotent_write(api_url, payload, headers=None):
    max_attempts = 5
    backoff_factor = 1

    for attempt in range(1, max_attempts + 1):
        response = requests.post(api_url, json=payload, headers=headers)
        if response.status_code == 201 or response.status_code == 200:
            return response.json(), response.status_code

        print("retrying")
        # Exponential backoff
        sleep_time = backoff_factor * (2 ** (attempt - 1)) + random.uniform(0,1)
        time.sleep(sleep_time)

    return None, response.status_code

# Exemplo de uso
api_url = "http://127.0.0.1:5000/resource"
payload = {
    "idempotencyKey": "123",
    "value": "example"
}
headers = {
    "Content-Type": "application/json"
}

response_body, status_code = test_idempotent_write(api_url, payload, headers)
print(f"Response body: {response_body}")
print(f"Status code: {status_code}")

PAra testar execute primeiro a API em um terminal/prompt. Abra outro terminal/prompt e execute o segundo script.