515 lines
23 KiB
C#
515 lines
23 KiB
C#
using System.Collections.Concurrent;
|
|
using System.Globalization;
|
|
using System.Security.Cryptography.X509Certificates;
|
|
using System.Text;
|
|
using System.Threading.Channels;
|
|
using System.Xml.Linq;
|
|
using Domain;
|
|
|
|
namespace Application
|
|
{
|
|
public class ProcessarMedicoesUseCase : IDisposable
|
|
{
|
|
private readonly IPostgresRepository _postgresRepository;
|
|
private readonly IAccessRepository _accessRepository;
|
|
private readonly TokenBucketRateLimiter _rateLimiter;
|
|
|
|
public ProcessarMedicoesUseCase(
|
|
IPostgresRepository postgresRepository,
|
|
IAccessRepository accessRepository)
|
|
{
|
|
_postgresRepository = postgresRepository;
|
|
_accessRepository = accessRepository;
|
|
// 400 requisições por minuto
|
|
_rateLimiter = new TokenBucketRateLimiter(4000, capacity: 4000);
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_rateLimiter?.Dispose();
|
|
}
|
|
|
|
private enum LogType { Info, Error, Operation, UpdateMeasurement }
|
|
|
|
private record LogItem(
|
|
LogType Tipo,
|
|
string Perfil,
|
|
string Ponto,
|
|
double DiaNum,
|
|
int Minuto,
|
|
string Status,
|
|
string Mensagem,
|
|
int Inseridos,
|
|
int Atualizados,
|
|
Medicao? Antes,
|
|
Medicao? Depois,
|
|
DateTime Timestamp)
|
|
{
|
|
public string ToCsvLine()
|
|
{
|
|
static string Esc(string? s) => (s ?? "").Replace(";", ",").Replace("\r", " ").Replace("\n", " ");
|
|
static string FMed(Medicao? m) =>
|
|
m is null ? "" : $"{Esc(m.Ponto)}|{m.DiaNum}|{m.Minuto}|{Esc(m.Origem)}|{m.AtivaConsumo?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.AtivaGeracao?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.ReativaConsumo?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.ReativaGeracao?.ToString(CultureInfo.InvariantCulture) ?? ""}";
|
|
|
|
return string.Join(";", new[]
|
|
{
|
|
Tipo.ToString(),
|
|
Esc(Perfil),
|
|
Esc(Ponto),
|
|
DateTime.FromOADate(DiaNum).ToString("dd/MM/yyyy"),
|
|
Minuto.ToString(),
|
|
Esc(Status),
|
|
Esc(Mensagem),
|
|
Inseridos.ToString(),
|
|
Atualizados.ToString(),
|
|
FMed(Antes),
|
|
FMed(Depois),
|
|
Timestamp.ToString("o")
|
|
});
|
|
}
|
|
}
|
|
|
|
public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct)
|
|
{
|
|
var logs = new ConcurrentBag<LogItem>();
|
|
var perfis = (await _accessRepository.ObterPerfisAsync(ct)).ToList();
|
|
|
|
var endpoint = new Uri("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2");
|
|
var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i));
|
|
|
|
await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) =>
|
|
{
|
|
if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null)
|
|
{
|
|
Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)");
|
|
logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dataIni.ToOADate(), 0, "ERRO", "cod_5min pendente", 0, 0, null, null, DateTime.UtcNow));
|
|
return;
|
|
}
|
|
|
|
var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, logs);
|
|
|
|
await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) =>
|
|
{
|
|
try
|
|
{
|
|
await ProcessarDiaAsync(perfil, dia, existentes, endpoint, logs, ctDia);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "ERRO", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow));
|
|
}
|
|
});
|
|
});
|
|
|
|
var linhas = new List<string>
|
|
{
|
|
"Tipo;Perfil;Ponto;DiaNum;Minuto;Status;Mensagem;Inseridos;Atualizados;Antes;Depois;Timestamp"
|
|
};
|
|
linhas.AddRange(logs.Select(l => l.ToCsvLine()));
|
|
|
|
File.WriteAllLines(caminhoLog, linhas);
|
|
}
|
|
|
|
private async Task<IDictionary<(string, double, int), Medicao>> ObterMedicoesComRetry(
|
|
string codigoSCDE, DateTime dataIni, DateTime dataFim, CancellationToken ct, ConcurrentBag<LogItem> logs)
|
|
{
|
|
int tentativas = 0;
|
|
while (tentativas < 3)
|
|
{
|
|
try
|
|
{
|
|
return await _postgresRepository.ObterMedicoesAsync(codigoSCDE, dataIni, dataFim, ct);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
tentativas++;
|
|
if (tentativas >= 3)
|
|
{
|
|
logs.Add(new LogItem(LogType.Error, "", codigoSCDE, dataIni.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow));
|
|
throw;
|
|
}
|
|
int backoff = (int)Math.Pow(2, tentativas) * 1000;
|
|
Console.WriteLine($"Erro ao acessar Postgres ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s...");
|
|
await Task.Delay(backoff, ct);
|
|
}
|
|
}
|
|
return new Dictionary<(string, double, int), Medicao>();
|
|
}
|
|
|
|
private async Task ProcessarDiaAsync(
|
|
Perfil perfil,
|
|
DateTime dia,
|
|
IDictionary<(string, double, int), Medicao> existentes,
|
|
Uri endpoint,
|
|
ConcurrentBag<LogItem> logs,
|
|
CancellationToken ct)
|
|
{
|
|
if (perfil.DataDeMigracao > dia)
|
|
{
|
|
Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)");
|
|
logs.Add(new LogItem(LogType.Info, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Fora da data de migração", $"Data de migração {perfil.DataDeMigracao} x {dia}", 0, 0, null, null, DateTime.UtcNow));
|
|
return;
|
|
}
|
|
|
|
// Acumulador de medidas (todas as páginas)
|
|
var acumulador = new List<XElement>();
|
|
int paginaAtual = 1;
|
|
int totalPaginas = 1;
|
|
|
|
while (paginaAtual <= totalPaginas)
|
|
{
|
|
int tentativas = 0;
|
|
bool sucesso = false;
|
|
|
|
while (tentativas < 5 && !sucesso)
|
|
{
|
|
try
|
|
{
|
|
string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, paginaAtual);
|
|
var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml");
|
|
|
|
// Aguarda token do rate limiter antes de cada requisição
|
|
await _rateLimiter.WaitAsync(ct);
|
|
|
|
HttpResponseMessage response;
|
|
string resposta;
|
|
|
|
using (var client = CreateHttpClient())
|
|
{
|
|
response = await client.PostAsync(endpoint, conteudo, ct);
|
|
resposta = await response.Content.ReadAsStringAsync();
|
|
}
|
|
|
|
if ((int)response.StatusCode >= 400)
|
|
{
|
|
try
|
|
{
|
|
SoapHelper.VerificarRespostaSOAP(resposta);
|
|
}
|
|
catch (SoapFaultException ex)
|
|
{
|
|
if (ex.ErrorCode == "2003")
|
|
{
|
|
// limite de requisições atingido -> aguardar tick de janela e tentar novamente a mesma página
|
|
var now = DateTime.UtcNow;
|
|
var delay = 60000 - (now.Second * 1000 + now.Millisecond);
|
|
Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}");
|
|
await Task.Delay(delay, ct);
|
|
continue; // não conta como tentativa extra; re-tenta a mesma página
|
|
}
|
|
if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001")
|
|
{
|
|
// erro persistente, registra e interrompe processamento deste dia/ponto
|
|
logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "SOAP Fault", $"{ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}", 0, 0, null, null, DateTime.UtcNow));
|
|
return;
|
|
}
|
|
throw;
|
|
}
|
|
}
|
|
|
|
// Parse e acumula medidas desta página
|
|
var doc = XDocument.Parse(resposta);
|
|
XNamespace ns = "http://xmlns.energia.org.br/BO/v2";
|
|
|
|
if (paginaAtual == 1)
|
|
{
|
|
int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas);
|
|
}
|
|
|
|
var medidasPagina = doc.Descendants(ns + "medida")
|
|
.Where(x => (string)x.Element(ns + "tipoEnergia") == "L")
|
|
.ToList();
|
|
|
|
acumulador.AddRange(medidasPagina);
|
|
|
|
// página processada com sucesso
|
|
sucesso = true;
|
|
paginaAtual++;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
tentativas++;
|
|
if (tentativas >= 5)
|
|
{
|
|
logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow));
|
|
// aborta o processamento do dia após falhas repetidas na mesma página
|
|
return;
|
|
}
|
|
else
|
|
{
|
|
int backoff = (int)Math.Pow(2.4, tentativas) * 1000;
|
|
Console.WriteLine($"Erro na requisição (página {paginaAtual}) ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s...");
|
|
await Task.Delay(backoff, ct);
|
|
}
|
|
}
|
|
} // fim tentativasPagina
|
|
} // fim while paginas
|
|
|
|
// ao final de todas as páginas, processa o XML acumulado
|
|
try
|
|
{
|
|
await ProcessarXMLAsync(acumulador, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, logs);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow));
|
|
}
|
|
}
|
|
|
|
private async Task ProcessarXMLAsync(
|
|
List<XElement> acumulador,
|
|
DateTime dia,
|
|
string perfil,
|
|
string ponto,
|
|
IDictionary<(string, double, int), Medicao> existentes,
|
|
CancellationToken ct,
|
|
ConcurrentBag<LogItem>? logs = null)
|
|
{
|
|
logs ??= new ConcurrentBag<LogItem>();
|
|
|
|
XNamespace ns = "http://xmlns.energia.org.br/BO/v2";
|
|
|
|
var medidasProcessadas = acumulador
|
|
.Select(m =>
|
|
{
|
|
string origem = m.Element(ns + "coletaMedicao")?.Element(ns + "tipo")?.Element(ns + "nome")?.Value ?? "";
|
|
string pontoMed = m.Element(ns + "medidor")?.Element(ns + "codigo")?.Value ?? "";
|
|
DateTime data = DateTime.Parse(m.Element(ns + "data")?.Value ?? "");
|
|
double diaNum = (data.ToOADate() - data.ToOADate() % 1);
|
|
int minuto = data.Hour * 60 + data.Minute;
|
|
if (minuto == 0) { minuto = 1440; diaNum--; }
|
|
|
|
double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value,
|
|
NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_c);
|
|
double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value,
|
|
NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_g);
|
|
double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value,
|
|
NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_c);
|
|
double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value,
|
|
NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_g);
|
|
|
|
return new Medicao(
|
|
pontoMed,
|
|
diaNum,
|
|
minuto,
|
|
origem,
|
|
ativa_c,
|
|
ativa_g,
|
|
reat_c,
|
|
reat_g
|
|
);
|
|
})
|
|
.ToList();
|
|
|
|
var minutosEsperados = new[] { 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60 };
|
|
|
|
var medidasPorHora = medidasProcessadas
|
|
.GroupBy(m => new { m.Ponto, m.DiaNum, Hora = (m.Minuto - 5) / 60 })
|
|
.ToList();
|
|
|
|
var medidasComEstimativa = new List<Medicao>();
|
|
for (int hora = 0; hora < 24; hora++)
|
|
{
|
|
var grupoHora = medidasPorHora.Where(h => h.Key.Hora == hora).ToList();
|
|
var lista = grupoHora.SelectMany(g => g).OrderBy(m => m.Minuto).ToList();
|
|
|
|
// Separar por origem
|
|
var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList();
|
|
var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList();
|
|
|
|
// Regra de prioridade
|
|
List<Medicao> selecionados;
|
|
if (logicas.Count == 12 || logicas.Count > diarias.Count)
|
|
{
|
|
selecionados = logicas;
|
|
}
|
|
else
|
|
{
|
|
selecionados = diarias;
|
|
}
|
|
|
|
var minutosPresentes = selecionados.Select(m => m.Minuto).ToHashSet();
|
|
var minutosEsperadosAbsolutos = minutosEsperados.Select(m => m + (60 * hora)).ToList();
|
|
var faltantes = minutosEsperadosAbsolutos.Except(minutosPresentes).OrderBy(m => m).ToList();
|
|
|
|
var estimadas = new List<Medicao>();
|
|
|
|
foreach (var faltante in faltantes)
|
|
{
|
|
if (faltantes.Count > 3)
|
|
{
|
|
// Se mais de 3 faltantes na hora, não faz estimativa
|
|
var estimada = new Medicao(
|
|
ponto + "P",
|
|
(dia.ToOADate() - dia.ToOADate() % 1),
|
|
faltante,
|
|
"Faltante",
|
|
null,
|
|
null,
|
|
null,
|
|
null
|
|
);
|
|
estimadas.Add(estimada);
|
|
continue;
|
|
}
|
|
else
|
|
{
|
|
var ativaConsumo = selecionados.Average(m => m.AtivaConsumo);
|
|
var ativaGeracao = selecionados.Average(m => m.AtivaGeracao);
|
|
var reativaConsumo = selecionados.Average(m => m.ReativaConsumo);
|
|
var reativaGeracao = selecionados.Average(m => m.ReativaGeracao);
|
|
|
|
var estimada = new Medicao(
|
|
grupoHora.First().Key.Ponto,
|
|
grupoHora.First().Key.DiaNum,
|
|
faltante,
|
|
"Estimado",
|
|
ativaConsumo,
|
|
ativaGeracao,
|
|
reativaConsumo,
|
|
reativaGeracao
|
|
);
|
|
estimadas.Add(estimada);
|
|
}
|
|
}
|
|
|
|
// Adiciona todos (originais + estimados) ao resultado final
|
|
medidasComEstimativa.AddRange(selecionados);
|
|
medidasComEstimativa.AddRange(estimadas);
|
|
}
|
|
|
|
var novos = new List<Medicao>();
|
|
var alterados = new List<Medicao>();
|
|
|
|
foreach (var m in medidasComEstimativa)
|
|
{
|
|
var chave = (m.Ponto, m.DiaNum, m.Minuto);
|
|
|
|
if (!existentes.TryGetValue(chave, out var existente))
|
|
{
|
|
novos.Add(m);
|
|
}
|
|
else
|
|
{
|
|
if (existente.Origem != m.Origem ||
|
|
Math.Round(existente.AtivaConsumo ?? 0, 10) != Math.Round(m.AtivaConsumo ?? 0, 10) ||
|
|
Math.Round(existente.AtivaGeracao ?? 0, 10) != Math.Round(m.AtivaGeracao ?? 0, 10) ||
|
|
Math.Round(existente.ReativaConsumo ?? 0, 10) != Math.Round(m.ReativaConsumo ?? 0, 10) ||
|
|
Math.Round(existente.ReativaGeracao ?? 0, 10) != Math.Round(m.ReativaGeracao ?? 0, 10))
|
|
{
|
|
alterados.Add(m);
|
|
// log detalhado por medição alterada
|
|
logs.Add(new LogItem(
|
|
LogType.UpdateMeasurement,
|
|
perfil,
|
|
m.Ponto,
|
|
m.DiaNum,
|
|
m.Minuto,
|
|
"ALTERADO",
|
|
"Medição alterada (antes x depois)",
|
|
0,
|
|
1,
|
|
existente,
|
|
m,
|
|
DateTime.UtcNow));
|
|
}
|
|
}
|
|
}
|
|
|
|
if (novos.Any())
|
|
{
|
|
await _postgresRepository.InserirMedicoesAsync(novos, ct);
|
|
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {novos.Count:D3} registros inseridos.");
|
|
logs.Add(new LogItem(LogType.Operation, perfil, ponto, dia.ToOADate(), 0, "OK", "Novos", novos.Count, 0, null, null, DateTime.UtcNow));
|
|
}
|
|
|
|
if (alterados.Any())
|
|
{
|
|
await _postgresRepository.AtualizarMedicoesAsync(alterados, ct);
|
|
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {alterados.Count:D3} registros atualizados.");
|
|
// logs.Add(new LogItem(LogType.Operation, perfil, ponto, dia.ToOADate(), 0, "OK", "Atualizados", 0, alterados.Count, null, null, DateTime.UtcNow));
|
|
}
|
|
if (!novos.Any() && !alterados.Any())
|
|
{
|
|
Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. 000 registros alterados.");
|
|
logs.Add(new LogItem(LogType.Info, perfil, ponto, dia.ToOADate(), 0, "OK", "Sem alterações", 0, 0, null, null, DateTime.UtcNow));
|
|
}
|
|
}
|
|
|
|
private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina)
|
|
{
|
|
string cam_ent, tex_req, sdat_req;
|
|
cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt";
|
|
cod_ponto += "P";
|
|
sdat_req = data_req.ToString("yyyy-MM-ddT00:00:00");
|
|
tex_req = File.ReadAllText(cam_ent);
|
|
tex_req = tex_req.Replace("DATAALTERADA", sdat_req);
|
|
tex_req = tex_req.Replace("PONTOMEDICAO", cod_ponto);
|
|
tex_req = tex_req.Replace("CODPERFIL", perfil);
|
|
tex_req = tex_req.Replace("PAGNUM", pagina.ToString());
|
|
return tex_req;
|
|
}
|
|
private static HttpClient CreateHttpClient()
|
|
{
|
|
// Configura o HttpClientHandler para ignorar erros SSL
|
|
var handler = new HttpClientHandler
|
|
{
|
|
ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true
|
|
};
|
|
|
|
// Adiciona o certificado SSL
|
|
handler.ClientCertificates.Add(new X509Certificate2(@"X:\Back\APP Smart\Certificado\cert_ssl.pfx", "appsmart"));
|
|
|
|
return new HttpClient(handler);
|
|
}
|
|
|
|
// Token-bucket simples: emite tokens a cada intervalo (distribui as requisições ao longo do tempo)
|
|
private class TokenBucketRateLimiter : IDisposable
|
|
{
|
|
private readonly Channel<bool> _channel;
|
|
private readonly CancellationTokenSource _cts = new();
|
|
private readonly Task _producer;
|
|
|
|
public TokenBucketRateLimiter(int tokensPerMinute, int capacity = 400, int initialTokens = 0)
|
|
{
|
|
if (tokensPerMinute <= 0) throw new ArgumentOutOfRangeException(nameof(tokensPerMinute));
|
|
_channel = Channel.CreateBounded<bool>(new BoundedChannelOptions(capacity)
|
|
{
|
|
SingleWriter = true,
|
|
SingleReader = false,
|
|
FullMode = BoundedChannelFullMode.DropWrite
|
|
});
|
|
|
|
// opcional: preencher tokens iniciais (0 evita burst inicial)
|
|
for (int i = 0; i < Math.Min(initialTokens, capacity); i++)
|
|
_channel.Writer.TryWrite(true);
|
|
|
|
var intervalMs = (int)Math.Ceiling(60000.0 / tokensPerMinute); // ex: 400 -> 150ms
|
|
|
|
_producer = Task.Run(async () =>
|
|
{
|
|
try
|
|
{
|
|
while (!_cts.Token.IsCancellationRequested)
|
|
{
|
|
await Task.Delay(intervalMs, _cts.Token);
|
|
// tenta escrever; se cheio, descarta (evita acumular tokens além da capacidade)
|
|
_channel.Writer.TryWrite(true);
|
|
}
|
|
}
|
|
catch (OperationCanceledException) { }
|
|
}, _cts.Token);
|
|
}
|
|
|
|
public ValueTask<bool> WaitAsync(CancellationToken ct) => _channel.Reader.ReadAsync(ct);
|
|
|
|
public void Dispose()
|
|
{
|
|
_cts.Cancel();
|
|
_channel.Writer.TryComplete();
|
|
try { _producer?.GetAwaiter().GetResult(); } catch { }
|
|
_cts.Dispose();
|
|
}
|
|
}
|
|
}
|
|
} |