diff --git a/Application/ProcessarMedicoesUseCase.cs b/Application/ProcessarMedicoesUseCase.cs index c389551..0126b90 100644 --- a/Application/ProcessarMedicoesUseCase.cs +++ b/Application/ProcessarMedicoesUseCase.cs @@ -2,16 +2,18 @@ using System.Collections.Concurrent; using System.Globalization; using System.Security.Cryptography.X509Certificates; using System.Text; +using System.Threading.Channels; using System.Xml.Linq; using Domain; using Infrastructure; namespace Application { - public class ProcessarMedicoesUseCase + public class ProcessarMedicoesUseCase : IDisposable { private readonly IPostgresRepository _postgresRepository; private readonly IAccessRepository _accessRepository; + private readonly TokenBucketRateLimiter _rateLimiter; public ProcessarMedicoesUseCase( IPostgresRepository postgresRepository, @@ -19,6 +21,13 @@ namespace Application { _postgresRepository = postgresRepository; _accessRepository = accessRepository; + // 400 requisições por minuto + _rateLimiter = new TokenBucketRateLimiter(4000, capacity: 4000); + } + + public void Dispose() + { + _rateLimiter?.Dispose(); } public async Task ExecuteAsync(DateTime dataIni, DateTime dataFim, string caminhoLog, CancellationToken ct) @@ -32,7 +41,7 @@ namespace Application await Parallel.ForEachAsync(perfis, async (perfil, ctPerfil) => { - Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}"); + //Console.WriteLine($"{DateTime.Now}: Iniciado ponto {perfil.CodigoSCDE}"); if (perfil.Codigo5Minutos == "0" || perfil.Codigo5Minutos == string.Empty || perfil.Codigo5Minutos == null) { Console.WriteLine($"Pular {perfil.CodigoSCDE} - (cod 5 min pendente)"); @@ -41,8 +50,8 @@ namespace Application } var existentes = await ObterMedicoesComRetry(perfil.CodigoSCDE, dataIni, dataFim, ctPerfil, errosPersistentes); - - // Paraleliza os dias deste perfil; o semáforo limita as requisições simultâneas + + // Paraleliza os dias deste perfil; o semáforo limita as requisições simultâneas await Parallel.ForEachAsync(datas, ctPerfil, async (dia, ctDia) => { try @@ -55,10 +64,10 @@ namespace Application } }); - Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}"); + //Console.WriteLine($"{DateTime.Now}: Finalizado ponto {perfil.CodigoSCDE}"); }); - // Cabeçalho do log + // Cabeçalho do log var linhasLog = new List { "Perfil;Ponto;DiaNum;Status;Mensagem;Inseridos;Atualizados" }; linhasLog.AddRange(operacoesLog); linhasLog.AddRange(errosPersistentes); @@ -95,7 +104,8 @@ namespace Application private async Task ProcessarDiaAsync( Perfil perfil, DateTime dia, - IDictionary<(string, double, int), Medicao> existentes, + IDictionary<(string, double, int), + Medicao> existentes, Uri endpoint, ConcurrentBag errosPersistentes, ConcurrentBag operacoesLog, @@ -103,118 +113,129 @@ namespace Application { if (perfil.DataDeMigracao > dia) { - Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)"); - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Fora da data de migração {perfil.DataDeMigracao} x {dia}"); + Console.WriteLine($"Pular {perfil.CodigoSCDE} - {dia.ToShortDateString()} (antes da migração)"); + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};Fora da data de migração {perfil.DataDeMigracao} x {dia}"); return; } - int tentativas = 0; - bool sucesso = false; - while (tentativas < 5 && !sucesso) + + // Acumulador de medidas (todas as páginas) + var acumulador = new List(); + int paginaAtual = 1; + int totalPaginas = 1; + + while (paginaAtual <= totalPaginas) { - try + int tentativas = 0; + bool sucesso = false; + + while (tentativas < 5 && !sucesso) { - string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, 1); - var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); - - HttpResponseMessage response; - string resposta; - - using (var client = CreateHttpClient()) + try { - response = await client.PostAsync(endpoint, conteudo, ct); - resposta = await response.Content.ReadAsStringAsync(); - } + string payload = Xml_requisicao(dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, paginaAtual); + var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); - if ((int)response.StatusCode >= 400) - { - try + // Aguarda token do rate limiter antes de cada requisição + await _rateLimiter.WaitAsync(ct); + + HttpResponseMessage response; + string resposta; + + using (var client = CreateHttpClient()) { - SoapHelper.VerificarRespostaSOAP(resposta); + response = await client.PostAsync(endpoint, conteudo, ct); + resposta = await response.Content.ReadAsStringAsync(); } - catch (SoapFaultException ex) + + if ((int)response.StatusCode >= 400) { - if (ex.ErrorCode == "2003") + try { - // Aguarda o tick de janela SEM estar segurando o semáforo - var now = DateTime.UtcNow; - var delay = 60000 - (now.Second * 1000 + now.Millisecond); - Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}"); - await Task.Delay(delay, ct); - continue; + SoapHelper.VerificarRespostaSOAP(resposta); } - if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001") + catch (SoapFaultException ex) { - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}"); - break; + if (ex.ErrorCode == "2003") + { + // limite de requisições atingido -> aguardar tick de janela e tentar novamente a mesma página + var now = DateTime.UtcNow; + var delay = 60000 - (now.Second * 1000 + now.Millisecond); + Console.WriteLine($"!! Limite de requisições atingido. Aguardando até {DateTime.Now.AddMilliseconds(delay)}"); + await Task.Delay(delay, ct); + continue; // não conta como tentativa extra; re-tenta a mesma página + } + if (ex.ErrorCode == "4001" || ex.ErrorCode == "2001") + { + // erro persistente, registra e interrompe processamento deste dia/ponto + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};SOAP Fault: {ex.ErrorCode};{ex.ErrorMessage.Replace("\n", "-n-")}"); + return; + } + throw; } - throw; + } + + // Parse e acumula medidas desta página + var doc = XDocument.Parse(resposta); + XNamespace ns = "http://xmlns.energia.org.br/BO/v2"; + + if (paginaAtual == 1) + { + int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas); + } + + var medidasPagina = doc.Descendants(ns + "medida") + .Where(x => (string)x.Element(ns + "tipoEnergia") == "L") + .ToList(); + + acumulador.AddRange(medidasPagina); + + // página processada com sucesso + sucesso = true; + paginaAtual++; + } + catch (Exception ex) + { + tentativas++; + if (tentativas >= 5) + { + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}"); + // aborta o processamento do dia após falhas repetidas na mesma página + return; + } + else + { + int backoff = (int)Math.Pow(2.4, tentativas) * 1000; + Console.WriteLine($"Erro na requisição (página {paginaAtual}) ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s..."); + await Task.Delay(backoff, ct); } } + } // fim tentativasPagina + } // fim while paginas - await ProcessarXMLAsync(resposta, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, endpoint, 1, null, 1, operacoesLog); - sucesso = true; - } - catch (Exception ex) - { - tentativas++; - if (tentativas >= 5) - { - errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}"); - } - else - { - int backoff = (int)Math.Pow(2.4, tentativas) * 1000; - Console.WriteLine($"Erro na requisição ({ex.Message}), tentativa {tentativas}. Aguardando {backoff / 1000}s..."); - await Task.Delay(backoff, ct); - } - } + // ao final de todas as páginas, processa o XML acumulado + try + { + await ProcessarXMLAsync(acumulador, dia, perfil.Codigo5Minutos, perfil.CodigoSCDE, existentes, ct, operacoesLog); + } + catch (Exception ex) + { + errosPersistentes.Add($"{perfil.Codigo5Minutos};{perfil.CodigoSCDE};{dia.ToOADate()};Erro;{ex.Message.Replace("\n", "-n-")}"); } } private async Task ProcessarXMLAsync( - string xml, + List acumulador, DateTime dia, string perfil, string ponto, IDictionary<(string, double, int), Medicao> existentes, CancellationToken ct, - Uri endpoint, - int paginaAtual = 1, - List? acumulador = null, - int totalPaginas = 1, ConcurrentBag? operacoesLog = null) { - var doc = XDocument.Parse(xml); + // Processa as medidas já acumuladas (antes chamadas faziam paginação) XNamespace ns = "http://xmlns.energia.org.br/BO/v2"; - int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "totalPaginas")?.Value, out totalPaginas); - int.TryParse(doc.Descendants().FirstOrDefault(e => e.Name.LocalName == "numero")?.Value, out paginaAtual); - - var medidas = doc.Descendants(ns + "medida") - .Where(x => (string)x.Element(ns + "tipoEnergia") == "L"); - - acumulador ??= new List(); - acumulador.AddRange(medidas); - - if (paginaAtual < totalPaginas) - { - // Próxima página: adquire o semáforo apenas para o HTTP e libera logo após - string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1); - var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); - - string proxXml; - - using (var client = CreateHttpClient()) - { - using var resp = await client.PostAsync(endpoint, conteudo, ct); - proxXml = await resp.Content.ReadAsStringAsync(); - } - - await ProcessarXMLAsync(proxXml, dia, perfil, ponto, existentes, ct, endpoint, paginaAtual + 1, acumulador, totalPaginas, operacoesLog); - return; - } - var medidasProcessadas = acumulador .Select(m => { @@ -260,8 +281,8 @@ namespace Application var lista = grupoHora.SelectMany(g => g).OrderBy(m => m.Minuto).ToList(); // Separar por origem - var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList(); - var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList(); + var logicas = lista.Where(m => m.Origem == "Inspeção Lógica").ToList(); + var diarias = lista.Where(m => m.Origem == "Coleta Diária").ToList(); // Regra de prioridade List selecionados; @@ -284,7 +305,7 @@ namespace Application { if (faltantes.Count > 3) { - // Se mais de 3 faltantes na hora, não faz estimativa + // Se mais de 3 faltantes na hora, n�o faz estimativa var estimada = new Medicao( ponto + "P", (dia.ToOADate() - dia.ToOADate() % 1), @@ -351,27 +372,27 @@ namespace Application if (novos.Any()) { await _postgresRepository.InserirMedicoesAsync(novos, ct); - Console.WriteLine($"Inserido {novos.Count} registros. Ponto {ponto}. Dia {dia}"); + Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {novos.Count:D3} registros inseridos."); operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Novos;{novos.Count};0"); } if (alterados.Any()) { await _postgresRepository.AtualizarMedicoesAsync(alterados, ct); - Console.WriteLine($"Atualizado {alterados.Count} registros. Ponto {ponto}. Dia {dia}"); + Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. {alterados.Count:D3} registros atualizados."); operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Atualizados;0;{alterados.Count}"); } if (!novos.Any() && !alterados.Any()) { - Console.WriteLine($"Nenhuma alteração. Ponto {ponto}. Dia {dia}"); - operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alterações;0;0"); + Console.WriteLine($"Ponto {ponto}. Dia {dia:dd/MM/yyyy}. 000 registros alterados."); + operacoesLog?.Add($"{perfil};{ponto};{dia.ToOADate()};OK;Sem alterações;0;0"); } } private static string Xml_requisicao(DateTime data_req, string perfil, string cod_ponto, int pagina) { string cam_ent, tex_req, sdat_req; - cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt"; + cam_ent = @"X:\Back\Plataforma de Integração CCEE\RequestPaginate.txt"; cod_ponto += "P"; sdat_req = data_req.ToString("yyyy-MM-ddT00:00:00"); tex_req = File.ReadAllText(cam_ent); @@ -394,5 +415,54 @@ namespace Application return new HttpClient(handler); } + + // Token-bucket simples: emite tokens a cada intervalo (distribui as requisições ao longo do tempo) + private class TokenBucketRateLimiter : IDisposable + { + private readonly Channel _channel; + private readonly CancellationTokenSource _cts = new(); + private readonly Task _producer; + + public TokenBucketRateLimiter(int tokensPerMinute, int capacity = 400, int initialTokens = 0) + { + if (tokensPerMinute <= 0) throw new ArgumentOutOfRangeException(nameof(tokensPerMinute)); + _channel = Channel.CreateBounded(new BoundedChannelOptions(capacity) + { + SingleWriter = true, + SingleReader = false, + FullMode = BoundedChannelFullMode.DropWrite + }); + + // opcional: preencher tokens iniciais (0 evita burst inicial) + for (int i = 0; i < Math.Min(initialTokens, capacity); i++) + _channel.Writer.TryWrite(true); + + var intervalMs = (int)Math.Ceiling(60000.0 / tokensPerMinute); // ex: 400 -> 150ms + + _producer = Task.Run(async () => + { + try + { + while (!_cts.Token.IsCancellationRequested) + { + await Task.Delay(intervalMs, _cts.Token); + // tenta escrever; se cheio, descarta (evita acumular tokens além da capacidade) + _channel.Writer.TryWrite(true); + } + } + catch (OperationCanceledException) { } + }, _cts.Token); + } + + public ValueTask WaitAsync(CancellationToken ct) => _channel.Reader.ReadAsync(ct); + + public void Dispose() + { + _cts.Cancel(); + _channel.Writer.TryComplete(); + try { _producer?.GetAwaiter().GetResult(); } catch { } + _cts.Dispose(); + } + } } } \ No newline at end of file diff --git a/Infrastructure/AccessRepository.cs b/Infrastructure/AccessRepository.cs index 6e07a7d..16d19ac 100644 --- a/Infrastructure/AccessRepository.cs +++ b/Infrastructure/AccessRepository.cs @@ -22,8 +22,8 @@ namespace Infrastructure string sql = $"SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 AND Unidade_gerenciada ORDER BY cod_smart_unidade"; //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and (Cliente = 'RMC ALIMENTOS' OR Cliente = 'FERREIRA SUPERMERCADO' OR Cliente = 'VANGUARDA ALIMENTOS') AND Unidade_gerenciada ORDER BY PerfilCCEE"; //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'ABEVÊ' and Unidade = 'ABV LOJA 29 - COXIM' AND Unidade_gerenciada ORDER BY PerfilCCEE"; - //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'RMC ALIMENTOS' AND Unidade_gerenciada ORDER BY PerfilCCEE"; - //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'MTTMAUENTR101'"; + //string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Cliente = 'SURF CENTER' AND Unidade_gerenciada ORDER BY PerfilCCEE"; + // string sql = "SELECT Cod_5min, Codigo_SCDE, Data_de_Migracao FROM Dados_cadastrais WHERE LEN(Codigo_SCDE) > 5 and Codigo_SCDE = 'PIPMTEUFCHE01'"; using var command = new OleDbCommand(sql, connection); using var reader = await command.ExecuteReaderAsync(ct); diff --git a/Presentation/Program.cs b/Presentation/Program.cs index 5070968..aa94cc0 100644 --- a/Presentation/Program.cs +++ b/Presentation/Program.cs @@ -1,9 +1,5 @@ -using System.Data.OleDb; -using Application; +using Application; using Infrastructure; -using System.Net; -using System.Net.Security; -using System.Security.Cryptography.X509Certificates; class Program { @@ -11,17 +7,19 @@ class Program { DateTime inicio = DateTime.Now; string PG_CONN_STRING_PROD = "Server = smart-energia-dev-pgsql.cykff7tj7mik.us-east-1.rds.amazonaws.com; Port = 5432; Database = smartenergiaprod; Username = postgres; Password = VfHml#Z78!%kvvNM; Timeout = 60; CommandTimeout = 60; ApplicationName = new_med_5_min; Connection Lifetime = 120; Minimum Pool Size = 2; Maximum Pool Size = 4;"; + // string PG_CONN_STRING_PROD = "Server = 192.168.10.248; Port = 5432; Database = smartenergiadev; Username = postgres; Password = gds21; Timeout = 60; CommandTimeout = 60; ApplicationName = new_med_5_min; Connection Lifetime = 120; Minimum Pool Size = 2; Maximum Pool Size = 4;"; string ACCESS_CONN_STRING = @"Provider=Microsoft.ACE.OLEDB.12.0;Data Source=\\srv-dados\documentos\Middle\Informativo Setorial\Modelo Word\BD1_dados cadastrais e faturas.accdb;Jet OLEDB:Database Password=gds21"; string caminhoLog = $@"\\srv-dados\documentos\Back\Carteira x.x\Codigo\Erros\log_erros_{inicio:MM_dd_HH_mm}.csv"; //DateTime dataIni = new DateTime(inicio.Year, inicio.Month, 1); //DateTime dataFim = new DateTime(inicio.Year, inicio.Month, inicio.Day); - DateTime dataIni = new DateTime(inicio.Year, 10, 01); - DateTime dataFim = new DateTime(inicio.Year, 10, 28); + //junho finalizado + DateTime dataIni = new DateTime(inicio.Year, 11, 01); + DateTime dataFim = new DateTime(inicio.Year, 11, 14); // Configuração de dependências (pode usar um container DI depois) var postgresRepo = new PostgresRepository(PG_CONN_STRING_PROD); var accessRepo = new AccessRepository(ACCESS_CONN_STRING); - + var useCase = new ProcessarMedicoesUseCase(postgresRepo, accessRepo); await useCase.ExecuteAsync(dataIni, dataFim, caminhoLog, CancellationToken.None);