using Domain; using Npgsql; using NpgsqlTypes; namespace Infrastructure { public class PostgresRepository : IPostgresRepository { private readonly NpgsqlDataSource _dataSource; public PostgresRepository(string connectionString) { _dataSource = NpgsqlDataSource.Create(connectionString); } public async Task> ObterMedicoesAsync(string codigoSCDE, DateTime dataIni, DateTime dataFim, CancellationToken ct) { var existentes = new Dictionary<(string, double, int), Medicao>(); string sql = @" SELECT ponto, dia_num, minuto, origem, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao FROM med_5min WHERE ponto = @ponto AND dia_num >= @data_ini AND dia_num <= @data_fim"; await using var command = _dataSource.CreateCommand(sql); command.Parameters.AddWithValue("ponto", codigoSCDE + "P"); command.Parameters.AddWithValue("data_ini", dataIni.ToOADate()); command.Parameters.AddWithValue("data_fim", dataFim.ToOADate()); await using var reader = await command.ExecuteReaderAsync(ct); while (await reader.ReadAsync(ct)) { var medicao = new Medicao( reader.GetString(0), reader.GetDouble(1), reader.GetInt32(2), reader.GetString(3), reader.IsDBNull(4) ? (double?)null : reader.GetDouble(4), reader.IsDBNull(5) ? (double?)null : reader.GetDouble(5), reader.IsDBNull(6) ? (double?)null : reader.GetDouble(6), reader.IsDBNull(7) ? (double?)null : reader.GetDouble(7) ); existentes[(medicao.Ponto, medicao.DiaNum, medicao.Minuto)] = medicao; } return existentes; } public async Task InserirMedicoesAsync(IEnumerable medicoes, CancellationToken ct) { await using var connection = await _dataSource.OpenConnectionAsync(ct); using var writer = connection.BeginBinaryImport( "COPY med_5min (origem, dia_num, minuto, ativa_consumo, ativa_geracao, reativa_consumo, reativa_geracao, ponto) FROM STDIN (FORMAT BINARY)"); foreach (var m in medicoes) { writer.StartRow(); writer.Write(m.Origem); writer.Write(m.DiaNum, NpgsqlDbType.Numeric); writer.Write(m.Minuto, NpgsqlDbType.Integer); writer.Write(m.AtivaConsumo, NpgsqlDbType.Numeric); writer.Write(m.AtivaGeracao, NpgsqlDbType.Numeric); writer.Write(m.ReativaConsumo, NpgsqlDbType.Numeric); writer.Write(m.ReativaGeracao, NpgsqlDbType.Numeric); writer.Write(m.Ponto); } await writer.CompleteAsync(); } public async Task AtualizarMedicoesAsync(IEnumerable medicoes, CancellationToken ct) { await using var connection = await _dataSource.OpenConnectionAsync(ct); using var batch = new NpgsqlBatch(connection); foreach (var m in medicoes) { var cmd = new NpgsqlBatchCommand(@" UPDATE med_5min SET origem = @origem, ativa_consumo = @ativa_consumo, ativa_geracao = @ativa_geracao, reativa_consumo = @reativa_consumo, reativa_geracao = @reativa_geracao WHERE ponto = @ponto AND dia_num = @dia_num AND minuto = @minuto;"); cmd.Parameters.AddWithValue("origem", m.Origem); cmd.Parameters.AddWithValue("ativa_consumo", NpgsqlDbType.Numeric, m.AtivaConsumo ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("ativa_geracao", NpgsqlDbType.Numeric, m.AtivaGeracao ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("reativa_consumo", NpgsqlDbType.Numeric, m.ReativaConsumo ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("reativa_geracao", NpgsqlDbType.Numeric, m.ReativaGeracao ?? (object)DBNull.Value); cmd.Parameters.AddWithValue("ponto", m.Ponto); cmd.Parameters.AddWithValue("dia_num", m.DiaNum); cmd.Parameters.AddWithValue("minuto", m.Minuto); batch.BatchCommands.Add(cmd); } await batch.ExecuteNonQueryAsync(ct); } } }