diff --git a/Program.cs b/Program.cs index a639f64..62cb271 100644 --- a/Program.cs +++ b/Program.cs @@ -72,6 +72,8 @@ internal class Plat_integ private static async Task ProcessarMedicoesAsync(DateTime dataIni, DateTime dataFim, List perfis, string caminhoLog) { + var limiter = new RateLimiter(400, TimeSpan.FromMinutes(1)); + var errosPersistentes = new ConcurrentBag(); //using var conn = new NpgsqlConnection(PG_CONN_STRING_PROD); @@ -84,7 +86,7 @@ internal class Plat_integ var datas = Enumerable.Range(0, (dataFim - dataIni).Days).Select(i => dataIni.AddDays(i)); - await Parallel.ForEachAsync(datas/*, new ParallelOptions { MaxDegreeOfParallelism = 1 }*/, async (dia, ct) => + await Parallel.ForEachAsync(perfis, async (perfil, ct) => { foreach (var item in perfis) { @@ -98,6 +100,7 @@ internal class Plat_integ string payload = Xml_requisicao(dia, item.Item1, item.Item2, 1); var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); + await limiter.WaitAsync(ct); using var response = await client.PostAsync(endpoint, conteudo, ct); if ((int)response.StatusCode == 429) // limite da API @@ -163,23 +166,24 @@ internal class Plat_integ var medidas = doc.Descendants(ns + "medida") .Where(x => (string)x.Element(ns + "tipoEnergia") == "L"); + acumulador ??= new List(); acumulador.AddRange(medidas); - //if (paginaAtual < totalPaginas) - //{ - // // Requisita próxima página - // string payload = Xml_requisicao(DateTime.Parse(acumulador.First().Element(ns + "data")!.Value), // data de referência - // acumulador.First().Element(ns + "perfil")?.Value ?? "", // perfil (ajustar conforme necessário) - // acumulador.First().Element(ns + "medidor")?.Element(ns + "codigo")?.Value ?? "", - // paginaAtual + 1); + if (paginaAtual < totalPaginas) + { + // Requisita próxima página + string payload = Xml_requisicao(dia, perfil, ponto, paginaAtual + 1); - // var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); - // using var resp = await client.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo); + var conteudo = new StringContent(payload, Encoding.UTF8, "application/xml"); + await limiter.WaitAsync(ct); + using var resp = await client.PostAsync("https://servicos.ccee.org.br/ws/v2/MedidaCincoMinutosBSv2", conteudo); - // string proxXml = await resp.Content.ReadAsStringAsync(); - // return await ProcessarXMLAsync(proxXml, conn, acumulador, paginaAtual + 1, totalPaginas); - //} + string proxXml = await resp.Content.ReadAsStringAsync(); + + await ProcessarXMLAsync(proxXml, dataSource, dia, perfil, ponto, existentes, limiter, ct, paginaAtual + 1, acumulador, totalPaginas); + return; + } var medidasProcessadas = acumulador .Select(m => @@ -247,28 +251,47 @@ internal class Plat_integ { Console.WriteLine("atualizado"); } - // Se não atualizou ninguém → faz INSERT - if (rowsAffected == 0) + + public class RateLimiter + { + private readonly int _maxRequests; + private readonly TimeSpan _interval; + private int _requestCount; + private DateTime _windowStart; + private readonly object _lock = new(); + + public RateLimiter(int maxRequests, TimeSpan interval) { - string sqlInsert = @" - INSERT INTO med_5min - (origem, dia_num, minuto, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao, ponto) - VALUES (@origem, @dia_num, @minuto, @ativa_c, @ativa_g, @reat_c, @reat_g, @ponto); - "; + _maxRequests = maxRequests; + _interval = interval; + var now = DateTime.Now; + _windowStart = new DateTime(now.Year, now.Month, now.Day, now.Hour, now.Minute - (now.Minute % interval.Minutes), 0); + _requestCount = 0; + } - using var cmdInsert = new NpgsqlCommand(sqlInsert, connection); - cmdInsert.Parameters.AddWithValue("origem", m.Origem); - cmdInsert.Parameters.AddWithValue("dia_num", m.DiaNum); - cmdInsert.Parameters.AddWithValue("minuto", m.Minuto); - cmdInsert.Parameters.AddWithValue("ativa_c", m.AtivaC); - cmdInsert.Parameters.AddWithValue("ativa_g", m.AtivaG); - cmdInsert.Parameters.AddWithValue("reat_c", m.ReatC); - cmdInsert.Parameters.AddWithValue("reat_g", m.ReatG); - cmdInsert.Parameters.AddWithValue("ponto", m.Ponto); + public async Task WaitAsync(CancellationToken ct) + { + while (true) + { + lock (_lock) + { + if ((DateTime.Now - _windowStart) > _interval) + { + // reset janela + _windowStart = DateTime.Now; + _requestCount = 0; + } - await cmdInsert.ExecuteNonQueryAsync(); + if (_requestCount < _maxRequests) + { + _requestCount++; + return; // pode prosseguir + } + } - Console.WriteLine("inserido"); + // já bateu o limite → espera até reset + await Task.Delay(200, ct); // aguarda 200ms e tenta de novo + } } } }