From c98aec2c24c711d185106b5c69c2bca8dd30fba6 Mon Sep 17 00:00:00 2001 From: Adriano Serighelli Date: Fri, 24 Oct 2025 14:18:49 -0300 Subject: [PATCH] =?UTF-8?q?Melhorias=20em=20concorr=C3=AAncia,=20logs=20e?= =?UTF-8?q?=20tratamento=20de=20erros?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adicionado `SemaphoreSlim` para controle de requisições HTTP simultâneas. - Implementado log detalhado de operações com `ConcurrentBag`. - Melhorado tratamento de erros com mensagens enriquecidas. - Refatorados métodos para suportar concorrência e paginação. - Removido método `Interpolar` por não ser mais necessário. - Ajustada consulta SQL em `AccessRepository` para novos filtros. - Refatorada atualização em lote no `PostgresRepository`. - Alterado intervalo de datas e suporte a proxy em `Program.cs`. - Melhorias gerais de formatação, comentários e lógica de seleção. --- Application/ProcessarMedicoesUseCase.cs | 146 ++++++++++++++---------- Infrastructure/AccessRepository.cs | 6 +- Infrastructure/PostgresRepository.cs | 64 +++-------- Presentation/Program.cs | 20 +++- 4 files changed, 119 insertions(+), 117 deletions(-) diff --git a/Application/ProcessarMedicoesUseCase.cs b/Application/ProcessarMedicoesUseCase.cs index 63f4b0c..36ed885 100644 --- a/Application/ProcessarMedicoesUseCase.cs +++ b/Application/ProcessarMedicoesUseCase.cs @@ -4,7 +4,6 @@ using System.Text; using System.Xml.Linq; using Domain; using Infrastructure; -using static System.Runtime.InteropServices.JavaScript.JSType; namespace Application { @@ -30,35 +29,52 @@ namespace Application public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct) { var errosPersistentes = new ConcurrentBag(); + var operacoesLog = new ConcurrentBag(); var perfis = (await _accessRepository.ObterPerfisAsync(ct)).ToList(); _httpClient.DefaultRequestHeaders.Add("SOAPAction", "listarMedidaCincoMinutos"); var endpoint = new Uri("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2"); var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i)); - await Parallel.ForEachAsync(perfis, async (perfil, ct) => + // Controle explcito de concorrncia HTTP (assncrona) + // Ajuste conforme latncia observada (ex.: 32, 48, 64). + var maxConcurrentRequests = 10; + using var httpSemaphore = new SemaphoreSlim(maxConcurrentRequests); + + await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) => { Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}"); - if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty) + if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null) { Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)"); - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE}; cod_5min pendente"); + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dataIni.ToOADate()};ERRO;cod_5min pendente"); return; } - var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ct, errosPersistentes); + var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, errosPersistentes); - foreach (DateTime dia in datas) + // Paraleliza os dias deste perfil; o semforo limita as requisies simultneas + await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) => { - await ProcessarDiaAsync(perfil, dia, existentes, endpoint, errosPersistentes, ct); - } + try + { + await ProcessarDiaAsync(perfil, dia, existentes, endpoint, errosPersistentes, operacoesLog, ctDia, httpSemaphore); + } + catch (Exception ex) + { + operacoesLog.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};ERRO;{ex.Message.Replace("\n", "-n-")}"); + } + }); + Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}"); }); - if (errosPersistentes.Count > 0) - { - File.WriteAllLines(caminhoLog, new[] { "Perfil;Ponto;Status;Message" }.Concat(errosPersistentes)); - } + // Cabealho do log + var linhasLog = new List { "Perfil;Ponto;DiaNum;Status;Mensagem;Inseridos;Atualizados" }; + linhasLog.AddRange(operacoesLog); + linhasLog.AddRange(errosPersistentes); + + File.WriteAllLines(caminhoLog, linhasLog); } private async Task> ObterMedicoesComRetry( @@ -76,7 +92,7 @@ namespace Application tentativas++; if (tentativas >= 3) { - errosPersistentes.Add($"{codigoSCDE};Erro;{ex.Message.Replace("\n", "-n-")}"); + errosPersistentes.Add($";{codigoSCDE};{dataIni.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}"); throw; } int backoff = (int)Math.Pow(2, tentativas) * 1000; @@ -93,7 +109,9 @@ namespace Application IDictionary<(string, double, int), Medicao> existentes, Uri endpoint, ConcurrentBag errosPersistentes, - CancellationToken ct) + ConcurrentBag operacoesLog, + CancellationToken ct, + SemaphoreSlim httpSemaphore) { if (perfil.DataDeMigracao > dia) { @@ -111,9 +129,21 @@ namespace Application string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, 1); var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); + // SEGURAR O SEMFORO APENAS NA CHAMADA HTTP await _rateLimiter.WaitAsync(ct); - using var response = await _httpClient.PostAsync(endpoint, conteudo, ct); - string resposta = await response.Content.ReadAsStringAsync(); + + HttpResponseMessage response; + string resposta; + await httpSemaphore.WaitAsync(ct); + try + { + response = await _httpClient.PostAsync(endpoint, conteudo, ct); + resposta = await response.Content.ReadAsStringAsync(); + } + finally + { + httpSemaphore.Release(); + } if ((int)response.StatusCode >= 400) { @@ -125,6 +155,7 @@ namespace Application { if (ex.ErrorCode == "2003") { + // Aguarda o tick de janela SEM estar segurando o semforo var now = DateTime.UtcNow; var delay = 60000 - (now.Second * 1000 + now.Millisecond); Console.WriteLine($"!! Limite de requisies atingido. Aguardando at {DateTime.Now.AddMilliseconds(delay)}"); @@ -133,14 +164,14 @@ namespace Application } if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001") { - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}"); + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}"); break; } throw; } } - await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, 1); + await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, endpoint, httpSemaphore, 1, null, 1, operacoesLog); sucesso = true; } catch (Exception ex) @@ -148,7 +179,7 @@ namespace Application tentativas++; if (tentativas >= 5) { - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Erro;{ex.Message.Replace("\n", "-n-")}"); + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}"); } else { @@ -159,6 +190,7 @@ namespace Application } } } + private async Task ProcessarXMLAsync( string xml, DateTime dia, @@ -166,9 +198,12 @@ namespace Application string ponto, IDictionary<(string, double, int), Medicao> existentes, CancellationToken ct, + Uri endpoint, + SemaphoreSlim httpSemaphore, int paginaAtual = 1, List? acumulador = null, - int totalPaginas = 1) + int totalPaginas = 1, + ConcurrentBag? operacoesLog = null) { var doc = XDocument.Parse(xml); XNamespace ns = "http://xmlns.energia.org.br/BO/v2"; @@ -184,14 +219,25 @@ namespace Application if (paginaAtual < totalPaginas) { - // Requisita prxima pgina + // Prxima pgina: adquire o semforo apenas para o HTTP e libera logo aps string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1); var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); - await _rateLimiter.WaitAsync(ct); - using var resp = await _httpClient.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo, ct); - string proxXml = await resp.Content.ReadAsStringAsync(); - await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, paginaAtual + 1, acumulador, totalPaginas); + await _rateLimiter.WaitAsync(ct); + + string proxXml; + await httpSemaphore.WaitAsync(ct); + try + { + using var resp = await _httpClient.PostAsync(endpoint, conteudo, ct); + proxXml = await resp.Content.ReadAsStringAsync(); + } + finally + { + httpSemaphore.Release(); + } + + await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, endpoint, httpSemaphore, paginaAtual + 1, acumulador, totalPaginas, operacoesLog); return; } @@ -206,13 +252,13 @@ namespace Application if (minuto == 0) { minuto = 1440; diaNum--; } double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value, - NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_c); + NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_c); double.TryParse(m.Element(ns + "energiaAtiva")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value, - NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_g); + NumberStyles.Any, CultureInfo.InvariantCulture, out double ativa_g); double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "consumo")?.Element(ns + "valor")?.Value, - NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_c); + NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_c); double.TryParse(m.Element(ns + "energiaReativa")?.Element(ns + "geracao")?.Element(ns + "valor")?.Value, - NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_g); + NumberStyles.Any, CultureInfo.InvariantCulture, out double reat_g); return new Medicao( pontoMed, @@ -245,7 +291,7 @@ namespace Application // Regra de prioridade List selecionados; - if (logicas.Count > diarias.Count) + if (logicas.Count == 12 || logicas.Count > diarias.Count) { selecionados = logicas; } @@ -332,14 +378,22 @@ namespace Application { await _postgresRepository.InserirMedicoesAsync(novos, ct); Console.WriteLine($"Inserido {novos.Count} registros. Ponto {ponto}. Dia {dia}"); + operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Novos;{novos.Count};0"); } if (alterados.Any()) { await _postgresRepository.AtualizarMedicoesAsync(alterados, ct); Console.WriteLine($"Atualizado {alterados.Count} registros. Ponto {ponto}. Dia {dia}"); + operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Atualizados;0;{alterados.Count}"); + } + if (!novos.Any() && !alterados.Any()) + { + Console.WriteLine($"Nenhuma alterao. Ponto {ponto}. Dia {dia}"); + operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alteraes;0;0"); } } + private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina) { string cam_ent, tex_req, sdat_req; @@ -353,37 +407,5 @@ namespace Application tex_req = tex_req.Replace("PAGNUM", pagina.ToString()); return tex_req; } - private static double? Interpolar( - double? xAnterior, double? yAnterior, - double? xPosterior, double? yPosterior, - double xProcurado) - { - if (xAnterior.HasValue && yAnterior.HasValue && - (!xPosterior.HasValue || !yPosterior.HasValue)) - { - return yAnterior.Value; - } - - if (xPosterior.HasValue && yPosterior.HasValue && - (!xAnterior.HasValue || !yAnterior.HasValue)) - { - return yPosterior.Value; - } - - if (xAnterior.HasValue && yAnterior.HasValue && - xPosterior.HasValue && yPosterior.HasValue) - { - if (xPosterior.Value == xAnterior.Value) - throw new ArgumentException("xAnterior e xPosterior no podem ser iguais (diviso por zero)."); - - double yProcurado = yAnterior.Value + - ((yPosterior.Value - yAnterior.Value) / (xPosterior.Value - xAnterior.Value)) * - (xProcurado - xAnterior.Value); - - return yProcurado; - } - - return null; - } } } \ No newline at end of file diff --git a/Infrastructure/AccessRepository.cs b/Infrastructure/AccessRepository.cs index 9c4956c..e2ad28d 100644 --- a/Infrastructure/AccessRepository.cs +++ b/Infrastructure/AccessRepository.cs @@ -20,10 +20,10 @@ namespace Infrastructure await connection.OpenAsync(ct); //string sql = $"SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 AND Unidade_gerenciada ORDER BY cod_smart_unidade"; - //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE"; + string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE"; //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'ABEVÊ' and Unidade = 'ABV LOJA 29 - COXIM' AND Unidade_gerenciada ORDER BY PerfilCCEE"; - string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'SIDERQUÍMICA' AND Unidade_gerenciada ORDER BY PerfilCCEE"; - //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'DFLBCEENTR101'"; + //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'RMC ALIMENTOS' AND Unidade_gerenciada ORDER BY PerfilCCEE"; + //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = '3016021620'"; using var command = new OleDbCommand(sql, connection); using var reader = await command.ExecuteReaderAsync(ct); diff --git a/Infrastructure/PostgresRepository.cs b/Infrastructure/PostgresRepository.cs index e4555c1..ca72c47 100644 --- a/Infrastructure/PostgresRepository.cs +++ b/Infrastructure/PostgresRepository.cs @@ -75,65 +75,35 @@ namespace Infrastructure await using var connection = await _dataSource.OpenConnectionAsync(ct); using var batch = new NpgsqlBatch(connection); - // Gerar os parâmetros dinamicamente, mantendo a abordagem parametrizada + // Criar os valores da atualização, como se fosse uma tabela temporária var valores = medicoes - .Select((m, index) => new - { - Index = index, - Ponto = m.Ponto, - DiaNum = m.DiaNum, - Minuto = m.Minuto, - Origem = m.Origem, - AtivaConsumo = m.AtivaConsumo, - AtivaGeracao = m.AtivaGeracao, - ReativaConsumo = m.ReativaConsumo, - ReativaGeracao = m.ReativaGeracao - }) + .Select(m => $"('{m.Ponto}', {m.DiaNum}, {m.Minuto}, '{m.Origem}', " + + $"{(m.AtivaConsumo.HasValue ? m.AtivaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " + + $"{(m.AtivaGeracao.HasValue ? m.AtivaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " + + $"{(m.ReativaConsumo.HasValue ? m.ReativaConsumo.Value.ToString(CultureInfo.InvariantCulture) : "NULL")}, " + + $"{(m.ReativaGeracao.HasValue ? m.ReativaGeracao.Value.ToString(CultureInfo.InvariantCulture) : "NULL")})") .ToList(); - var query = @" + // Gerar a query dinâmica com os valores em formato de tabela temporária + var query = $@" UPDATE med_5min SET origem = nv.origem, - ativa_consumo = nv.ativa_consumo, - ativa_geracao = nv.ativa_geracao, - reativa_consumo = nv.reativa_consumo, - reativa_geracao = nv.reativa_geracao - FROM (VALUES"; - - // Adicionar os valores para o `VALUES` - for (int i = 0; i < valores.Count; i++) - { - query += $" (@ponto_{i}, @dia_num_{i}, @minuto_{i}, @origem_{i}, " + - $"@ativa_consumo_{i}, @ativa_geracao_{i}, @reativa_consumo_{i}, @reativa_geracao_{i})"; - if (i < valores.Count - 1) - { - query += ", "; - } - } - - query += @") AS nv (ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao) + ativa_consumo = CAST(nv.ativa_consumo AS numeric), + ativa_geracao = CAST(nv.ativa_geracao AS numeric), + reativa_consumo = CAST(nv.reativa_consumo AS numeric), + reativa_geracao = CAST(nv.reativa_geracao AS numeric) + FROM (VALUES + {string.Join(",", valores)} + ) AS nv (ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao) WHERE med_5min.ponto = nv.ponto AND med_5min.dia_num = nv.dia_num AND med_5min.minuto = nv.minuto;"; - // Agora, vamos adicionar os parâmetros à query + // Criação do comando NpgsqlBatchCommand var cmd = new NpgsqlBatchCommand(query); - - foreach (var valor in valores) - { - cmd.Parameters.AddWithValue($"ponto_{valor.Index}", valor.Ponto); - cmd.Parameters.AddWithValue($"dia_num_{valor.Index}", valor.DiaNum); - cmd.Parameters.AddWithValue($"minuto_{valor.Index}", valor.Minuto); - cmd.Parameters.AddWithValue($"origem_{valor.Index}", valor.Origem); - cmd.Parameters.AddWithValue($"ativa_consumo_{valor.Index}", valor.AtivaConsumo ?? (object)DBNull.Value); - cmd.Parameters.AddWithValue($"ativa_geracao_{valor.Index}", valor.AtivaGeracao ?? (object)DBNull.Value); - cmd.Parameters.AddWithValue($"reativa_consumo_{valor.Index}", valor.ReativaConsumo ?? (object)DBNull.Value); - cmd.Parameters.AddWithValue($"reativa_geracao_{valor.Index}", valor.ReativaGeracao ?? (object)DBNull.Value); - } - batch.BatchCommands.Add(cmd); - // Executar a query + // Executar o comando await batch.ExecuteNonQueryAsync(ct); } } diff --git a/Presentation/Program.cs b/Presentation/Program.cs index 68ec52d..6bbb86e 100644 --- a/Presentation/Program.cs +++ b/Presentation/Program.cs @@ -1,6 +1,9 @@ using System.Data.OleDb; using Application; using Infrastructure; +using System.Net; +using System.Net.Security; +using System.Security.Cryptography.X509Certificates; class Program { @@ -13,7 +16,7 @@ class Program //DateTime dataIni = new DateTime(inicio.Year, inicio.Month, 1); //DateTime dataFim = new DateTime(inicio.Year, inicio.Month, inicio.Day); DateTime dataIni = new DateTime(inicio.Year, 10, 01); - DateTime dataFim = new DateTime(inicio.Year, 10, 16); + DateTime dataFim = new DateTime(inicio.Year, 10, 23); // Configuração de dependências (pode usar um container DI depois) var postgresRepo = new PostgresRepository(PG_CONN_STRING_PROD); @@ -21,16 +24,23 @@ class Program var httpClient = new HttpClient(new HttpClientHandler { ClientCertificateOptions = ClientCertificateOption.Automatic, - //Proxy = new WebProxy("127.0.0.1", 8888), - //UseProxy = true, - //ServerCertificateCustomValidationCallback = (HttpRequestMessage req, X509Certificate2? cert, X509Chain? chain, SslPolicyErrors errors) => true + Proxy = new WebProxy("127.0.0.1", 8888), + UseProxy = true, + ServerCertificateCustomValidationCallback = (HttpRequestMessage req, X509Certificate2? cert, X509Chain? chain, SslPolicyErrors errors) => true }); + var rateLimiter = new RateLimiter(400, TimeSpan.FromMinutes(1)); var useCase = new ProcessarMedicoesUseCase(postgresRepo, accessRepo, httpClient, rateLimiter); await useCase.ExecuteAsync(dataIni, dataFim, caminhoLog, CancellationToken.None); Console.WriteLine($"Concluído. Tempo total: {DateTime.Now - inicio}"); - Console.ReadKey(); + string input = string.Empty; + + while (input.ToLower() != "fim") + { + Console.WriteLine("Digite 'fim' para finalizar:"); + input = Console.ReadLine(); + } } } \ No newline at end of file