Adiciona controle de taxa com TokenBucketRateLimiter e refatora ProcessarMedicoesUseCase para melhorar a concorrência e o tratamento de erros

Requisições não são realizadas de forma recursiva. O controle de páginação é realizado em ProcessarDiaAsync juntamente com o controle de erros e retry's
This commit is contained in:
Adriano Serighelli 2025-11-14 09:38:44 -03:00
parent 8826ba3a31
commit 817d542631
3 changed files with 175 additions and 107 deletions

View File

@ -2,16 +2,18 @@ using System.Collections.Concurrent;
using System.Globalization;
using System.Security.Cryptography.X509Certificates;
using System.Text;
using System.Threading.Channels;
using System.Xml.Linq;
using Domain;
using Infrastructure;
namespace Application
{
public class ProcessarMedicoesUseCase
public class ProcessarMedicoesUseCase : IDisposable
{
private readonly IPostgresRepository _postgresRepository;
private readonly IAccessRepository _accessRepository;
private readonly TokenBucketRateLimiter _rateLimiter;
public ProcessarMedicoesUseCase(
IPostgresRepository postgresRepository,
@ -19,6 +21,13 @@ namespace Application
{
_postgresRepository = postgresRepository;
_accessRepository = accessRepository;
// 400 requisições por minuto
_rateLimiter = new TokenBucketRateLimiter(4000, capacity: 4000);
}
public void Dispose()
{
_rateLimiter?.Dispose();
}
public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct)
@ -32,7 +41,7 @@ namespace Application
await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) =>
{
Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}");
//Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}");
if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null)
{
Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)");
@ -42,7 +51,7 @@ namespace Application
var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, errosPersistentes);
// Paraleliza os dias deste perfil; o semáforo limita as requisições simultâneas
// Paraleliza os dias deste perfil; o semáforo limita as requisições simultâneas
await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) =>
{
try
@ -55,10 +64,10 @@ namespace Application
}
});
Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}");
//Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}");
});
// Cabeçalho do log
// Cabeçalho do log
var linhasLog = new List<string> { "Perfil;Ponto;DiaNum;Status;Mensagem;Inseridos;Atualizados" };
linhasLog.AddRange(operacoesLog);
linhasLog.AddRange(errosPersistentes);
@ -95,7 +104,8 @@ namespace Application
private async Task ProcessarDiaAsync(
Perfil perfil,
DateTime dia,
IDictionary<(string, double, int), Medicao> existentes,
IDictionary<(string, double, int),
Medicao> existentes,
Uri endpoint,
ConcurrentBag<string> errosPersistentes,
ConcurrentBag<string> operacoesLog,
@ -103,118 +113,129 @@ namespace Application
{
if (perfil.DataDeMigracao > dia)
{
Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Fora da data de migração {perfil.DataDeMigracao} x {dia}");
Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Fora da data de migração {perfil.DataDeMigracao} x {dia}");
return;
}
int tentativas = 0;
bool sucesso = false;
while (tentativas < 5 && !sucesso)
// Acumulador de medidas (todas as páginas)
var acumulador = new List<XElement>();
int paginaAtual = 1;
int totalPaginas = 1;
while (paginaAtual <= totalPaginas)
{
try
int tentativas = 0;
bool sucesso = false;
while (tentativas < 5 && !sucesso)
{
string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, 1);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
HttpResponseMessage response;
string resposta;
using (var client = CreateHttpClient())
try
{
response = await client.PostAsync(endpoint, conteudo, ct);
resposta = await response.Content.ReadAsStringAsync();
}
string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, paginaAtual);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
if ((int)response.StatusCode >= 400)
{
try
// Aguarda token do rate limiter antes de cada requisição
await _rateLimiter.WaitAsync(ct);
HttpResponseMessage response;
string resposta;
using (var client = CreateHttpClient())
{
SoapHelper.VerificarRespostaSOAP(resposta);
response = await client.PostAsync(endpoint, conteudo, ct);
resposta = await response.Content.ReadAsStringAsync();
}
catch (SoapFaultException ex)
if ((int)response.StatusCode >= 400)
{
if (ex.ErrorCode == "2003")
try
{
// Aguarda o tick de janela SEM estar segurando o semáforo
var now = DateTime.UtcNow;
var delay = 60000 - (now.Second * 1000 + now.Millisecond);
Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}");
await Task.Delay(delay, ct);
continue;
SoapHelper.VerificarRespostaSOAP(resposta);
}
if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001")
catch (SoapFaultException ex)
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}");
break;
if (ex.ErrorCode == "2003")
{
// limite de requisições atingido -> aguardar tick de janela e tentar novamente a mesma página
var now = DateTime.UtcNow;
var delay = 60000 - (now.Second * 1000 + now.Millisecond);
Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}");
await Task.Delay(delay, ct);
continue; // não conta como tentativa extra; re-tenta a mesma página
}
if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001")
{
// erro persistente, registra e interrompe processamento deste dia/ponto
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}");
return;
}
throw;
}
throw;
}
// Parse e acumula medidas desta página
var doc = XDocument.Parse(resposta);
XNamespace ns = "http://xmlns.energia.org.br/BO/v2";
if (paginaAtual == 1)
{
int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas);
}
var medidasPagina = doc.Descendants(ns + "medida")
.Where(x => (string)x.Element(ns + "tipoEnergia") == "L")
.ToList();
acumulador.AddRange(medidasPagina);
// página processada com sucesso
sucesso = true;
paginaAtual++;
}
catch (Exception ex)
{
tentativas++;
if (tentativas >= 5)
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}");
// aborta o processamento do dia após falhas repetidas na mesma página
return;
}
else
{
int backoff = (int)Math.Pow(2.4, tentativas) * 1000;
Console.WriteLine($"Erro na requisição (página {paginaAtual}) ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s...");
await Task.Delay(backoff, ct);
}
}
} // fim tentativasPagina
} // fim while paginas
await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, endpoint, 1, null, 1, operacoesLog);
sucesso = true;
}
catch (Exception ex)
{
tentativas++;
if (tentativas >= 5)
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}");
}
else
{
int backoff = (int)Math.Pow(2.4, tentativas) * 1000;
Console.WriteLine($"Erro na requisição ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s...");
await Task.Delay(backoff, ct);
}
}
// ao final de todas as páginas, processa o XML acumulado
try
{
await ProcessarXMLAsync(acumulador, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, operacoesLog);
}
catch (Exception ex)
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}");
}
}
private async Task ProcessarXMLAsync(
string xml,
List<XElement> acumulador,
DateTime dia,
string perfil,
string ponto,
IDictionary<(string, double, int), Medicao> existentes,
CancellationToken ct,
Uri endpoint,
int paginaAtual = 1,
List<XElement>? acumulador = null,
int totalPaginas = 1,
ConcurrentBag<string>? operacoesLog = null)
{
var doc = XDocument.Parse(xml);
// Processa as medidas já acumuladas (antes chamadas faziam paginação)
XNamespace ns = "http://xmlns.energia.org.br/BO/v2";
int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas);
int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "numero")?.Value, out paginaAtual);
var medidas = doc.Descendants(ns + "medida")
.Where(x => (string)x.Element(ns + "tipoEnergia") == "L");
acumulador ??= new List<XElement>();
acumulador.AddRange(medidas);
if (paginaAtual < totalPaginas)
{
// Próxima página: adquire o semáforo apenas para o HTTP e libera logo após
string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
string proxXml;
using (var client = CreateHttpClient())
{
using var resp = await client.PostAsync(endpoint, conteudo, ct);
proxXml = await resp.Content.ReadAsStringAsync();
}
await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, endpoint, paginaAtual + 1, acumulador, totalPaginas, operacoesLog);
return;
}
var medidasProcessadas = acumulador
.Select(m =>
{
@ -260,8 +281,8 @@ namespace Application
var lista = grupoHora.SelectMany(g => g).OrderBy(m => m.Minuto).ToList();
// Separar por origem
var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList();
var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList();
var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList();
var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList();
// Regra de prioridade
List<Medicao> selecionados;
@ -284,7 +305,7 @@ namespace Application
{
if (faltantes.Count > 3)
{
// Se mais de 3 faltantes na hora, não faz estimativa
// Se mais de 3 faltantes na hora, n<EFBFBD>o faz estimativa
var estimada = new Medicao(
ponto + "P",
(dia.ToOADate() - dia.ToOADate() % 1),
@ -351,27 +372,27 @@ namespace Application
if (novos.Any())
{
await _postgresRepository.InserirMedicoesAsync(novos, ct);
Console.WriteLine($"Inserido {novos.Count} registros. Ponto {ponto}. Dia {dia}");
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {novos.Count:D3} registros inseridos.");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Novos;{novos.Count};0");
}
if (alterados.Any())
{
await _postgresRepository.AtualizarMedicoesAsync(alterados, ct);
Console.WriteLine($"Atualizado {alterados.Count} registros. Ponto {ponto}. Dia {dia}");
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {alterados.Count:D3} registros atualizados.");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Atualizados;0;{alterados.Count}");
}
if (!novos.Any() && !alterados.Any())
{
Console.WriteLine($"Nenhuma alteração. Ponto {ponto}. Dia {dia}");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alterações;0;0");
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. 000 registros alterados.");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alterações;0;0");
}
}
private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina)
{
string cam_ent, tex_req, sdat_req;
cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt";
cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt";
cod_ponto += "P";
sdat_req = data_req.ToString("yyyy-MM-ddT00:00:00");
tex_req = File.ReadAllText(cam_ent);
@ -394,5 +415,54 @@ namespace Application
return new HttpClient(handler);
}
// Token-bucket simples: emite tokens a cada intervalo (distribui as requisições ao longo do tempo)
private class TokenBucketRateLimiter : IDisposable
{
private readonly Channel<bool> _channel;
private readonly CancellationTokenSource _cts = new();
private readonly Task _producer;
public TokenBucketRateLimiter(int tokensPerMinute, int capacity = 400, int initialTokens = 0)
{
if (tokensPerMinute <= 0) throw new ArgumentOutOfRangeException(nameof(tokensPerMinute));
_channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(capacity)
{
SingleWriter = true,
SingleReader = false,
FullMode = BoundedChannelFullMode.DropWrite
});
// opcional: preencher tokens iniciais (0 evita burst inicial)
for (int i = 0; i < Math.Min(initialTokens, capacity); i++)
_channel.Writer.TryWrite(true);
var intervalMs = (int)Math.Ceiling(60000.0 / tokensPerMinute); // ex: 400 -> 150ms
_producer = Task.Run(async () =>
{
try
{
while (!_cts.Token.IsCancellationRequested)
{
await Task.Delay(intervalMs, _cts.Token);
// tenta escrever; se cheio, descarta (evita acumular tokens além da capacidade)
_channel.Writer.TryWrite(true);
}
}
catch (OperationCanceledException) { }
}, _cts.Token);
}
public ValueTask<bool> WaitAsync(CancellationToken ct) => _channel.Reader.ReadAsync(ct);
public void Dispose()
{
_cts.Cancel();
_channel.Writer.TryComplete();
try { _producer?.GetAwaiter().GetResult(); } catch { }
_cts.Dispose();
}
}
}
}

View File

@ -22,8 +22,8 @@ namespace Infrastructure
string sql = $"SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 AND Unidade_gerenciada ORDER BY cod_smart_unidade";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'ABEVÊ' and Unidade = 'ABV LOJA 29 - COXIM' AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'RMC ALIMENTOS' AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'MTTMAUENTR101'";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'SURF CENTER' AND Unidade_gerenciada ORDER BY PerfilCCEE";
// string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'PIPMTEUFCHE01'";
using var command = new OleDbCommand(sql, connection);
using var reader = await command.ExecuteReaderAsync(ct);

View File

@ -1,9 +1,5 @@
using System.Data.OleDb;
using Application;
using Application;
using Infrastructure;
using System.Net;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
class Program
{
@ -11,12 +7,14 @@ class Program
{
DateTime inicio = DateTime.Now;
string PG_CONN_STRING_PROD = "Server = smart-energia-dev-pgsql.cykff7tj7mik.us-east-1.rds.amazonaws.com; Port = 5432; Database = smartenergiaprod; Username = postgres; Password = VfHml#Z78!%kvvNM; Timeout = 60; CommandTimeout = 60; ApplicationName = new_med_5_min; Connection Lifetime = 120; Minimum Pool Size = 2; Maximum Pool Size = 4;";
// string PG_CONN_STRING_PROD = "Server = 192.168.10.248; Port = 5432; Database = smartenergiadev; Username = postgres; Password = gds21; Timeout = 60; CommandTimeout = 60; ApplicationName = new_med_5_min; Connection Lifetime = 120; Minimum Pool Size = 2; Maximum Pool Size = 4;";
string ACCESS_CONN_STRING = @"Provider=Microsoft.ACE.OLEDB.12.0;Data Source=\\srv-dados\documentos\Middle\Informativo Setorial\Modelo Word\BD1_dados cadastrais e faturas.accdb;Jet OLEDB:Database Password=gds21";
string caminhoLog = $@"\\srv-dados\documentos\Back\Carteira x.x\Codigo\Erros\log_erros_{inicio:MM_dd_HH_mm}.csv";
//DateTime dataIni = new DateTime(inicio.Year, inicio.Month, 1);
//DateTime dataFim = new DateTime(inicio.Year, inicio.Month, inicio.Day);
DateTime dataIni = new DateTime(inicio.Year, 10, 01);
DateTime dataFim = new DateTime(inicio.Year, 10, 28);
//junho finalizado
DateTime dataIni = new DateTime(inicio.Year, 11, 01);
DateTime dataFim = new DateTime(inicio.Year, 11, 14);
// Configuração de dependências (pode usar um container DI depois)
var postgresRepo = new PostgresRepository(PG_CONN_STRING_PROD);