using System.Collections.Concurrent; using System.Globalization; using System.Security.Cryptography.X509Certificates; using System.Text; using System.Threading.Channels; using System.Xml.Linq; using Domain; namespace Application { public class ProcessarMedicoesUseCase : IDisposable { private readonly IPostgresRepository _postgresRepository; private readonly IAccessRepository _accessRepository; private readonly TokenBucketRateLimiter _rateLimiter; public ProcessarMedicoesUseCase( IPostgresRepository postgresRepository, IAccessRepository accessRepository) { _postgresRepository = postgresRepository; _accessRepository = accessRepository; // 400 requisições por minuto _rateLimiter = new TokenBucketRateLimiter(4000, capacity: 4000); } public void Dispose() { _rateLimiter?.Dispose(); } private enum LogType { Info, Error, Operation, UpdateMeasurement } private record LogItem( LogType Tipo, string Perfil, string Ponto, double DiaNum, int Minuto, string Status, string Mensagem, int Inseridos, int Atualizados, Medicao? Antes, Medicao? Depois, DateTime Timestamp) { public string ToCsvLine() { static string Esc(string? s) => (s ?? "").Replace(";", ",").Replace("\r", " ").Replace("\n", " "); static string FMed(Medicao? m) => m is null ? "" : $"{Esc(m.Ponto)}|{m.DiaNum}|{m.Minuto}|{Esc(m.Origem)}|{m.AtivaConsumo?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.AtivaGeracao?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.ReativaConsumo?.ToString(CultureInfo.InvariantCulture) ?? ""}|{m.ReativaGeracao?.ToString(CultureInfo.InvariantCulture) ?? ""}"; return string.Join(";", new[] { Tipo.ToString(), Esc(Perfil), Esc(Ponto), DateTime.FromOADate(DiaNum).ToString("dd/MM/yyyy"), Minuto.ToString(), Esc(Status), Esc(Mensagem), Inseridos.ToString(), Atualizados.ToString(), FMed(Antes), FMed(Depois), Timestamp.ToString("o") }); } } public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct) { var logs = new ConcurrentBag(); var perfis = (await _accessRepository.ObterPerfisAsync(ct)).ToList(); var endpoint = new Uri("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2"); var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i)); await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) => { if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null) { Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)"); logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dataIni.ToOADate(), 0, "ERRO", "cod_5min pendente", 0, 0, null, null, DateTime.UtcNow)); return; } var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, logs); await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) => { try { await ProcessarDiaAsync(perfil, dia, existentes, endpoint, logs, ctDia); } catch (Exception ex) { logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "ERRO", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow)); } }); }); var linhas = new List { "Tipo;Perfil;Ponto;DiaNum;Minuto;Status;Mensagem;Inseridos;Atualizados;Antes;Depois;Timestamp" }; linhas.AddRange(logs.Select(l => l.ToCsvLine())); File.WriteAllLines(caminhoLog, linhas); } private async Task> ObterMedicoesComRetry( string codigoSCDE, DateTime dataIni, DateTime dataFim, CancellationToken ct, ConcurrentBag logs) { int tentativas = 0; while (tentativas < 3) { try { return await _postgresRepository.ObterMedicoesAsync(codigoSCDE, dataIni, dataFim, ct); } catch (Exception ex) { tentativas++; if (tentativas >= 3) { logs.Add(new LogItem(LogType.Error, "", codigoSCDE, dataIni.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow)); throw; } int backoff = (int)Math.Pow(2, tentativas) * 1000; Console.WriteLine($"Erro ao acessar Postgres ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s..."); await Task.Delay(backoff, ct); } } return new Dictionary<(string, double, int), Medicao>(); } private async Task ProcessarDiaAsync( Perfil perfil, DateTime dia, IDictionary<(string, double, int), Medicao> existentes, Uri endpoint, ConcurrentBag logs, CancellationToken ct) { if (perfil.DataDeMigracao > dia) { Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)"); logs.Add(new LogItem(LogType.Info, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Fora da data de migração", $"Data de migração {perfil.DataDeMigracao} x {dia}", 0, 0, null, null, DateTime.UtcNow)); return; } // Acumulador de medidas (todas as páginas) var acumulador = new List(); int paginaAtual = 1; int totalPaginas = 1; while (paginaAtual <= totalPaginas) { int tentativas = 0; bool sucesso = false; while (tentativas < 5 && !sucesso) { try { string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, paginaAtual); var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); // Aguarda token do rate limiter antes de cada requisição await _rateLimiter.WaitAsync(ct); HttpResponseMessage response; string resposta; using (var client = CreateHttpClient()) { response = await client.PostAsync(endpoint, conteudo, ct); resposta = await response.Content.ReadAsStringAsync(); } if ((int)response.StatusCode >= 400) { try { SoapHelper.VerificarRespostaSOAP(resposta); } catch (SoapFaultException ex) { if (ex.ErrorCode == "2003") { // limite de requisições atingido -> aguardar tick de janela e tentar novamente a mesma página var now = DateTime.UtcNow; var delay = 60000 - (now.Second * 1000 + now.Millisecond); Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}"); await Task.Delay(delay, ct); continue; // não conta como tentativa extra; re-tenta a mesma página } if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001") { // erro persistente, registra e interrompe processamento deste dia/ponto logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "SOAP Fault", $"{ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}", 0, 0, null, null, DateTime.UtcNow)); return; } throw; } } // Parse e acumula medidas desta página var doc = XDocument.Parse(resposta); XNamespace ns = "http://xmlns.energia.org.br/BO/v2"; if (paginaAtual == 1) { int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas); } var medidasPagina = doc.Descendants(ns + "medida") .Where(x => (string)x.Element(ns + "tipoEnergia") == "L") .ToList(); acumulador.AddRange(medidasPagina); // página processada com sucesso sucesso = true; paginaAtual++; } catch (Exception ex) { tentativas++; if (tentativas >= 5) { logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow)); // aborta o processamento do dia após falhas repetidas na mesma página return; } else { int backoff = (int)Math.Pow(2.4, tentativas) * 1000; Console.WriteLine($"Erro na requisição (página {paginaAtual}) ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s..."); await Task.Delay(backoff, ct); } } } // fim tentativasPagina } // fim while paginas // ao final de todas as páginas, processa o XML acumulado try { await ProcessarXMLAsync(acumulador, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, logs); } catch (Exception ex) { logs.Add(new LogItem(LogType.Error, perfil.Codigo5Minutos ?? "", perfil.CodigoSCDE ?? "", dia.ToOADate(), 0, "Erro", ex.Message.Replace("\n", "-n-"), 0, 0, null, null, DateTime.UtcNow)); } } private async Task ProcessarXMLAsync( List acumulador, DateTime dia, string perfil, string ponto, IDictionary<(string, double, int), Medicao> existentes, CancellationToken ct, ConcurrentBag? logs = null) { logs ??= new ConcurrentBag(); XNamespace ns = "http://xmlns.energia.org.br/BO/v2"; var medidasProcessadas = acumulador .Select(m => { string origem = m.Element(ns + "coletaMedicao")?.Element(ns + "tipo")?.Element(ns + "nome")?.Value ?? ""; string pontoMed = m.Element(ns + "medidor")?.Element(ns + "codigo")?.Value ?? ""; DateTime data = DateTime.Parse(m.Element(ns + "data")?.Value ?? ""); double diaNum = (data.ToOADate() - data.ToOADate() % 1); int minuto = data.Hour * 60 + data.Minute; if (minuto == 0) { minuto = 1440; diaNum--; } double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value, NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_c); double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value, NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_g); double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value, NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_c); double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value, NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_g); return new Medicao( pontoMed, diaNum, minuto, origem, ativa_c, ativa_g, reat_c, reat_g ); }) .ToList(); var minutosEsperados = new[] { 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55, 60 }; var medidasPorHora = medidasProcessadas .GroupBy(m => new { m.Ponto, m.DiaNum, Hora = (m.Minuto - 5) / 60 }) .ToList(); var medidasComEstimativa = new List(); for (int hora = 0; hora < 24; hora++) { var grupoHora = medidasPorHora.Where(h => h.Key.Hora == hora).ToList(); var lista = grupoHora.SelectMany(g => g).OrderBy(m => m.Minuto).ToList(); // Separar por origem var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList(); var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList(); // Regra de prioridade List selecionados; if (logicas.Count == 12 || logicas.Count > diarias.Count) { selecionados = logicas; } else { selecionados = diarias; } var minutosPresentes = selecionados.Select(m => m.Minuto).ToHashSet(); var minutosEsperadosAbsolutos = minutosEsperados.Select(m => m + (60 * hora)).ToList(); var faltantes = minutosEsperadosAbsolutos.Except(minutosPresentes).OrderBy(m => m).ToList(); var estimadas = new List(); foreach (var faltante in faltantes) { if (faltantes.Count > 3) { // Se mais de 3 faltantes na hora, não faz estimativa var estimada = new Medicao( ponto + "P", (dia.ToOADate() - dia.ToOADate() % 1), faltante, "Faltante", null, null, null, null ); estimadas.Add(estimada); continue; } else { var ativaConsumo = selecionados.Average(m => m.AtivaConsumo); var ativaGeracao = selecionados.Average(m => m.AtivaGeracao); var reativaConsumo = selecionados.Average(m => m.ReativaConsumo); var reativaGeracao = selecionados.Average(m => m.ReativaGeracao); var estimada = new Medicao( grupoHora.First().Key.Ponto, grupoHora.First().Key.DiaNum, faltante, "Estimado", ativaConsumo, ativaGeracao, reativaConsumo, reativaGeracao ); estimadas.Add(estimada); } } // Adiciona todos (originais + estimados) ao resultado final medidasComEstimativa.AddRange(selecionados); medidasComEstimativa.AddRange(estimadas); } var novos = new List(); var alterados = new List(); foreach (var m in medidasComEstimativa) { var chave = (m.Ponto, m.DiaNum, m.Minuto); if (!existentes.TryGetValue(chave, out var existente)) { novos.Add(m); } else { if (existente.Origem != m.Origem || Math.Round(existente.AtivaConsumo ?? 0, 10) != Math.Round(m.AtivaConsumo ?? 0, 10) || Math.Round(existente.AtivaGeracao ?? 0, 10) != Math.Round(m.AtivaGeracao ?? 0, 10) || Math.Round(existente.ReativaConsumo ?? 0, 10) != Math.Round(m.ReativaConsumo ?? 0, 10) || Math.Round(existente.ReativaGeracao ?? 0, 10) != Math.Round(m.ReativaGeracao ?? 0, 10)) { alterados.Add(m); // log detalhado por medição alterada logs.Add(new LogItem( LogType.UpdateMeasurement, perfil, m.Ponto, m.DiaNum, m.Minuto, "ALTERADO", "Medição alterada (antes x depois)", 0, 1, existente, m, DateTime.UtcNow)); } } } if (novos.Any()) { await _postgresRepository.InserirMedicoesAsync(novos, ct); Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {novos.Count:D3} registros inseridos."); logs.Add(new LogItem(LogType.Operation, perfil, ponto, dia.ToOADate(), 0, "OK", "Novos", novos.Count, 0, null, null, DateTime.UtcNow)); } if (alterados.Any()) { await _postgresRepository.AtualizarMedicoesAsync(alterados, ct); Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {alterados.Count:D3} registros atualizados."); // logs.Add(new LogItem(LogType.Operation, perfil, ponto, dia.ToOADate(), 0, "OK", "Atualizados", 0, alterados.Count, null, null, DateTime.UtcNow)); } if (!novos.Any() && !alterados.Any()) { Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. 000 registros alterados."); logs.Add(new LogItem(LogType.Info, perfil, ponto, dia.ToOADate(), 0, "OK", "Sem alterações", 0, 0, null, null, DateTime.UtcNow)); } } private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina) { string cam_ent, tex_req, sdat_req; cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt"; cod_ponto += "P"; sdat_req = data_req.ToString("yyyy-MM-ddT00:00:00"); tex_req = File.ReadAllText(cam_ent); tex_req = tex_req.Replace("DATAALTERADA", sdat_req); tex_req = tex_req.Replace("PONTOMEDICAO", cod_ponto); tex_req = tex_req.Replace("CODPERFIL", perfil); tex_req = tex_req.Replace("PAGNUM", pagina.ToString()); return tex_req; } private static HttpClient CreateHttpClient() { // Configura o HttpClientHandler para ignorar erros SSL var handler = new HttpClientHandler { ServerCertificateCustomValidationCallback = (message, cert, chain, errors) => true }; // Adiciona o certificado SSL handler.ClientCertificates.Add(new X509Certificate2(@"X:\Back\APP Smart\Certificado\cert_ssl.pfx", "appsmart")); return new HttpClient(handler); } // Token-bucket simples: emite tokens a cada intervalo (distribui as requisições ao longo do tempo) private class TokenBucketRateLimiter : IDisposable { private readonly Channel _channel; private readonly CancellationTokenSource _cts = new(); private readonly Task _producer; public TokenBucketRateLimiter(int tokensPerMinute, int capacity = 400, int initialTokens = 0) { if (tokensPerMinute <= 0) throw new ArgumentOutOfRangeException(nameof(tokensPerMinute)); _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) { SingleWriter = true, SingleReader = false, FullMode = BoundedChannelFullMode.DropWrite }); // opcional: preencher tokens iniciais (0 evita burst inicial) for (int i = 0; i < Math.Min(initialTokens, capacity); i++) _channel.Writer.TryWrite(true); var intervalMs = (int)Math.Ceiling(60000.0 / tokensPerMinute); // ex: 400 -> 150ms _producer = Task.Run(async () => { try { while (!_cts.Token.IsCancellationRequested) { await Task.Delay(intervalMs, _cts.Token); // tenta escrever; se cheio, descarta (evita acumular tokens além da capacidade) _channel.Writer.TryWrite(true); } } catch (OperationCanceledException) { } }, _cts.Token); } public ValueTask WaitAsync(CancellationToken ct) => _channel.Reader.ReadAsync(ct); public void Dispose() { _cts.Cancel(); _channel.Writer.TryComplete(); try { _producer?.GetAwaiter().GetResult(); } catch { } _cts.Dispose(); } } } }