Melhorias em concorrência, logs e tratamento de erros

- Adicionado `SemaphoreSlim` para controle de requisições HTTP simultâneas.
- Implementado log detalhado de operações com `ConcurrentBag`.
- Melhorado tratamento de erros com mensagens enriquecidas.
- Refatorados métodos para suportar concorrência e paginação.
- Removido método `Interpolar` por não ser mais necessário.
- Ajustada consulta SQL em `AccessRepository` para novos filtros.
- Refatorada atualização em lote no `PostgresRepository`.
- Alterado intervalo de datas e suporte a proxy em `Program.cs`.
- Melhorias gerais de formatação, comentários e lógica de seleção.
This commit is contained in:
Adriano Serighelli 2025-10-24 14:18:49 -03:00
parent 325bc76757
commit c98aec2c24
4 changed files with 119 additions and 117 deletions

View File

@ -4,7 +4,6 @@ using System.Text;
using System.Xml.Linq;
using Domain;
using Infrastructure;
using static System.Runtime.InteropServices.JavaScript.JSType;
namespace Application
{
@ -30,35 +29,52 @@ namespace Application
public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct)
{
var errosPersistentes = new ConcurrentBag<string>();
var operacoesLog = new ConcurrentBag<string>();
var perfis = (await _accessRepository.ObterPerfisAsync(ct)).ToList();
_httpClient.DefaultRequestHeaders.Add("SOAPAction", "listarMedidaCincoMinutos");
var endpoint = new Uri("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2");
var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i));
await Parallel.ForEachAsync(perfis, async (perfil, ct) =>
// Controle explícito de concorrência HTTP (assíncrona)
// Ajuste conforme latência observada (ex.: 32, 48, 64).
var maxConcurrentRequests = 10;
using var httpSemaphore = new SemaphoreSlim(maxConcurrentRequests);
await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) =>
{
Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}");
if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty)
if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null)
{
Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE}; cod_5min pendente");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dataIni.ToOADate()};ERRO;cod_5min pendente");
return;
}
var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ct, errosPersistentes);
var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, errosPersistentes);
foreach (DateTime dia in datas)
// Paraleliza os dias deste perfil; o semáforo limita as requisições simultâneas
await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) =>
{
await ProcessarDiaAsync(perfil, dia, existentes, endpoint, errosPersistentes, ct);
try
{
await ProcessarDiaAsync(perfil, dia, existentes, endpoint, errosPersistentes, operacoesLog, ctDia, httpSemaphore);
}
catch (Exception ex)
{
operacoesLog.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};ERRO;{ex.Message.Replace("\n", "-n-")}");
}
});
Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}");
});
if (errosPersistentes.Count > 0)
{
File.WriteAllLines(caminhoLog, new[] { "Perfil;Ponto;Status;Message" }.Concat(errosPersistentes));
}
// Cabeçalho do log
var linhasLog = new List<string> { "Perfil;Ponto;DiaNum;Status;Mensagem;Inseridos;Atualizados" };
linhasLog.AddRange(operacoesLog);
linhasLog.AddRange(errosPersistentes);
File.WriteAllLines(caminhoLog, linhasLog);
}
private async Task<IDictionary<(string, double, int), Medicao>> ObterMedicoesComRetry(
@ -76,7 +92,7 @@ namespace Application
tentativas++;
if (tentativas >= 3)
{
errosPersistentes.Add($"{codigoSCDE};Erro;{ex.Message.Replace("\n", "-n-")}");
errosPersistentes.Add($";{codigoSCDE};{dataIni.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}");
throw;
}
int backoff = (int)Math.Pow(2, tentativas) * 1000;
@ -93,7 +109,9 @@ namespace Application
IDictionary<(string, double, int), Medicao> existentes,
Uri endpoint,
ConcurrentBag<string> errosPersistentes,
CancellationToken ct)
ConcurrentBag<string> operacoesLog,
CancellationToken ct,
SemaphoreSlim httpSemaphore)
{
if (perfil.DataDeMigracao > dia)
{
@ -111,9 +129,21 @@ namespace Application
string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, 1);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
// SEGURAR O SEMÁFORO APENAS NA CHAMADA HTTP
await _rateLimiter.WaitAsync(ct);
using var response = await _httpClient.PostAsync(endpoint, conteudo, ct);
string resposta = await response.Content.ReadAsStringAsync();
HttpResponseMessage response;
string resposta;
await httpSemaphore.WaitAsync(ct);
try
{
response = await _httpClient.PostAsync(endpoint, conteudo, ct);
resposta = await response.Content.ReadAsStringAsync();
}
finally
{
httpSemaphore.Release();
}
if ((int)response.StatusCode >= 400)
{
@ -125,6 +155,7 @@ namespace Application
{
if (ex.ErrorCode == "2003")
{
// Aguarda o tick de janela SEM estar segurando o semáforo
var now = DateTime.UtcNow;
var delay = 60000 - (now.Second * 1000 + now.Millisecond);
Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}");
@ -133,14 +164,14 @@ namespace Application
}
if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001")
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}");
break;
}
throw;
}
}
await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, 1);
await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, endpoint, httpSemaphore, 1, null, 1, operacoesLog);
sucesso = true;
}
catch (Exception ex)
@ -148,7 +179,7 @@ namespace Application
tentativas++;
if (tentativas >= 5)
{
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Erro;{ex.Message.Replace("\n", "-n-")}");
errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}");
}
else
{
@ -159,6 +190,7 @@ namespace Application
}
}
}
private async Task ProcessarXMLAsync(
string xml,
DateTime dia,
@ -166,9 +198,12 @@ namespace Application
string ponto,
IDictionary<(string, double, int), Medicao> existentes,
CancellationToken ct,
Uri endpoint,
SemaphoreSlim httpSemaphore,
int paginaAtual = 1,
List<XElement>? acumulador = null,
int totalPaginas = 1)
int totalPaginas = 1,
ConcurrentBag<string>? operacoesLog = null)
{
var doc = XDocument.Parse(xml);
XNamespace ns = "http://xmlns.energia.org.br/BO/v2";
@ -184,14 +219,25 @@ namespace Application
if (paginaAtual < totalPaginas)
{
// Requisita próxima página
// Próxima página: adquire o semáforo apenas para o HTTP e libera logo após
string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1);
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
await _rateLimiter.WaitAsync(ct);
using var resp = await _httpClient.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo, ct);
string proxXml = await resp.Content.ReadAsStringAsync();
await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, paginaAtual + 1, acumulador, totalPaginas);
await _rateLimiter.WaitAsync(ct);
string proxXml;
await httpSemaphore.WaitAsync(ct);
try
{
using var resp = await _httpClient.PostAsync(endpoint, conteudo, ct);
proxXml = await resp.Content.ReadAsStringAsync();
}
finally
{
httpSemaphore.Release();
}
await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, endpoint, httpSemaphore, paginaAtual + 1, acumulador, totalPaginas, operacoesLog);
return;
}
@ -245,7 +291,7 @@ namespace Application
// Regra de prioridade
List<Medicao> selecionados;
if (logicas.Count > diarias.Count)
if (logicas.Count == 12 || logicas.Count > diarias.Count)
{
selecionados = logicas;
}
@ -332,14 +378,22 @@ namespace Application
{
await _postgresRepository.InserirMedicoesAsync(novos, ct);
Console.WriteLine($"Inserido {novos.Count} registros. Ponto {ponto}. Dia {dia}");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Novos;{novos.Count};0");
}
if (alterados.Any())
{
await _postgresRepository.AtualizarMedicoesAsync(alterados, ct);
Console.WriteLine($"Atualizado {alterados.Count} registros. Ponto {ponto}. Dia {dia}");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Atualizados;0;{alterados.Count}");
}
if (!novos.Any() && !alterados.Any())
{
Console.WriteLine($"Nenhuma alteração. Ponto {ponto}. Dia {dia}");
operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alterações;0;0");
}
}
private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina)
{
string cam_ent, tex_req, sdat_req;
@ -353,37 +407,5 @@ namespace Application
tex_req = tex_req.Replace("PAGNUM", pagina.ToString());
return tex_req;
}
private static double? Interpolar(
double? xAnterior, double? yAnterior,
double? xPosterior, double? yPosterior,
double xProcurado)
{
if (xAnterior.HasValue && yAnterior.HasValue &&
(!xPosterior.HasValue || !yPosterior.HasValue))
{
return yAnterior.Value;
}
if (xPosterior.HasValue && yPosterior.HasValue &&
(!xAnterior.HasValue || !yAnterior.HasValue))
{
return yPosterior.Value;
}
if (xAnterior.HasValue && yAnterior.HasValue &&
xPosterior.HasValue && yPosterior.HasValue)
{
if (xPosterior.Value == xAnterior.Value)
throw new ArgumentException("xAnterior e xPosterior não podem ser iguais (divisão por zero).");
double yProcurado = yAnterior.Value +
((yPosterior.Value - yAnterior.Value) / (xPosterior.Value - xAnterior.Value)) *
(xProcurado - xAnterior.Value);
return yProcurado;
}
return null;
}
}
}

View File

@ -20,10 +20,10 @@ namespace Infrastructure
await connection.OpenAsync(ct);
//string sql = $"SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 AND Unidade_gerenciada ORDER BY cod_smart_unidade";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE";
string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'ABEVÊ' and Unidade = 'ABV LOJA 29 - COXIM' AND Unidade_gerenciada ORDER BY PerfilCCEE";
string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'SIDERQUÍMICA' AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'DFLBCEENTR101'";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'RMC ALIMENTOS' AND Unidade_gerenciada ORDER BY PerfilCCEE";
//string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = '3016021620'";
using var command = new OleDbCommand(sql, connection);
using var reader = await command.ExecuteReaderAsync(ct);

View File

@ -75,65 +75,35 @@ namespace Infrastructure
await using var connection = await _dataSource.OpenConnectionAsync(ct);
using var batch = new NpgsqlBatch(connection);
// Gerar os parâmetros dinamicamente, mantendo a abordagem parametrizada
// Criar os valores da atualização, como se fosse uma tabela temporária
var valores = medicoes
.Select((m, index) => new
{
Index = index,
Ponto = m.Ponto,
DiaNum = m.DiaNum,
Minuto = m.Minuto,
Origem = m.Origem,
AtivaConsumo = m.AtivaConsumo,
AtivaGeracao = m.AtivaGeracao,
ReativaConsumo = m.ReativaConsumo,
ReativaGeracao = m.ReativaGeracao
})
.Select(m => $"('{m.Ponto}', {m.DiaNum}, {m.Minuto}, '{m.Origem}', " +
$"{(m.AtivaConsumo.HasValue ? m.AtivaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.AtivaGeracao.HasValue ? m.AtivaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.ReativaConsumo.HasValue ? m.ReativaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " +
$"{(m.ReativaGeracao.HasValue ? m.ReativaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")})")
.ToList();
var query = @"
// Gerar a query dinâmica com os valores em formato de tabela temporária
var query = $@"
UPDATE med_5min
SET origem = nv.origem,
ativa_consumo = nv.ativa_consumo,
ativa_geracao = nv.ativa_geracao,
reativa_consumo = nv.reativa_consumo,
reativa_geracao = nv.reativa_geracao
FROM (VALUES";
// Adicionar os valores para o `VALUES`
for (int i = 0; i < valores.Count; i++)
{
query += $" (@ponto_{i}, @dia_num_{i}, @minuto_{i}, @origem_{i}, " +
$"@ativa_consumo_{i}, @ativa_geracao_{i}, @reativa_consumo_{i}, @reativa_geracao_{i})";
if (i < valores.Count - 1)
{
query += ", ";
}
}
query += @") AS nv (ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao)
ativa_consumo = CAST(nv.ativa_consumo AS numeric),
ativa_geracao = CAST(nv.ativa_geracao AS numeric),
reativa_consumo = CAST(nv.reativa_consumo AS numeric),
reativa_geracao = CAST(nv.reativa_geracao AS numeric)
FROM (VALUES
{string.Join(",", valores)}
) AS nv (ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao)
WHERE med_5min.ponto = nv.ponto
AND med_5min.dia_num = nv.dia_num
AND med_5min.minuto = nv.minuto;";
// Agora, vamos adicionar os parâmetros à query
// Criação do comando NpgsqlBatchCommand
var cmd = new NpgsqlBatchCommand(query);
foreach (var valor in valores)
{
cmd.Parameters.AddWithValue($"ponto_{valor.Index}", valor.Ponto);
cmd.Parameters.AddWithValue($"dia_num_{valor.Index}", valor.DiaNum);
cmd.Parameters.AddWithValue($"minuto_{valor.Index}", valor.Minuto);
cmd.Parameters.AddWithValue($"origem_{valor.Index}", valor.Origem);
cmd.Parameters.AddWithValue($"ativa_consumo_{valor.Index}", valor.AtivaConsumo ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue($"ativa_geracao_{valor.Index}", valor.AtivaGeracao ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue($"reativa_consumo_{valor.Index}", valor.ReativaConsumo ?? (object)DBNull.Value);
cmd.Parameters.AddWithValue($"reativa_geracao_{valor.Index}", valor.ReativaGeracao ?? (object)DBNull.Value);
}
batch.BatchCommands.Add(cmd);
// Executar a query
// Executar o comando
await batch.ExecuteNonQueryAsync(ct);
}
}

View File

@ -1,6 +1,9 @@
using System.Data.OleDb;
using Application;
using Infrastructure;
using System.Net;
using System.Net.Security;
using System.Security.Cryptography.X509Certificates;
class Program
{
@ -13,7 +16,7 @@ class Program
//DateTime dataIni = new DateTime(inicio.Year, inicio.Month, 1);
//DateTime dataFim = new DateTime(inicio.Year, inicio.Month, inicio.Day);
DateTime dataIni = new DateTime(inicio.Year, 10, 01);
DateTime dataFim = new DateTime(inicio.Year, 10, 16);
DateTime dataFim = new DateTime(inicio.Year, 10, 23);
// Configuração de dependências (pode usar um container DI depois)
var postgresRepo = new PostgresRepository(PG_CONN_STRING_PROD);
@ -21,16 +24,23 @@ class Program
var httpClient = new HttpClient(new HttpClientHandler
{
ClientCertificateOptions = ClientCertificateOption.Automatic,
//Proxy = new WebProxy("127.0.0.1", 8888),
//UseProxy = true,
//ServerCertificateCustomValidationCallback = (HttpRequestMessage req, X509Certificate2? cert, X509Chain? chain, SslPolicyErrors errors) => true
Proxy = new WebProxy("127.0.0.1", 8888),
UseProxy = true,
ServerCertificateCustomValidationCallback = (HttpRequestMessage req, X509Certificate2? cert, X509Chain? chain, SslPolicyErrors errors) => true
});
var rateLimiter = new RateLimiter(400, TimeSpan.FromMinutes(1));
var useCase = new ProcessarMedicoesUseCase(postgresRepo, accessRepo, httpClient, rateLimiter);
await useCase.ExecuteAsync(dataIni, dataFim, caminhoLog, CancellationToken.None);
Console.WriteLine($"Concluído. Tempo total: {DateTime.Now - inicio}");
Console.ReadKey();
string input = string.Empty;
while (input.ToLower() != "fim")
{
Console.WriteLine("Digite 'fim' para finalizar:");
input = Console.ReadLine();
}
}
}