you can update version airflow.
for example, I had that error in the version 2.2.3 using docker-compose.
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
mysq 6.7
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
redis:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "250M"
airflow-webserver:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
airflow-scheduler:
cpus: "0.5"
mem_reservation: "10M"
mem_limit: "750M"
airflow-worker:
#cpus: "0.5"
#mem_reservation: "10M"
#mem_limit: "750M"
error: Task exited with return code Negsignal.SIGKILL
but update to the version
FROM apache/airflow:2.3.4.
and perform the pulls without problems, using the same resources configured in the docker-compose
enter image description here
my dag extractor:
function
def getDataForSchema(table,conecction,tmp_path, **kwargs):
conn=connect_sql_server(conecction)
query_count= f"select count(1) from {table['schema']}.{table['table_name']}"
logging.info(f"query: {query_count}")
real_count_rows = pd.read_sql_query(query_count, conn)
##sacar esquema de la tabla
metadataquery=f"SELECT COLUMN_NAME ,DATA_TYPE FROM information_schema.columns \
where table_name = '{table['table_name']}' and table_schema= '{table['schema']}'"
#logging.info(f"query metadata: {metadataquery}")
metadata = pd.read_sql_query(metadataquery, conn)
schema=generate_schema(metadata)
#logging.info(f"schema : {schema}")
#logging.info(f"schema: {schema}")
#consulta la tabla a extraer
query=f" SELECT {table['custom_column_names']} FROM {table['schema']}.{table['table_name']} "
logging.info(f"quere data :{query}")
chunksize=table["partition_field"]
data = pd.read_sql_query(query, conn, chunksize=chunksize)
count_rows=0
pqwriter=None
iteraccion=0
for df_row in data:
print(f"bloque {iteraccion} de total {count_rows} de un total {real_count_rows.iat[0, 0]}")
#logging.info(df_row.to_markdown())
if iteraccion == 0:
parquetName=f"{tmp_path}/{table['table_name']}_{iteraccion}.parquet"
pqwriter = pq.ParquetWriter(parquetName,schema)
tableData = pa.Table.from_pandas(df_row, schema=schema,safe=False, preserve_index=True)
#logging.info(f" tabledata {tableData.column(17)}")
pqwriter.write_table(tableData)
#logging.info(f"parquet name:::{parquetName}")
##pasar a parquet df directo
#df_row.to_parquet(parquetName)
iteraccion=iteraccion+1
count_rows += len(df_row)
del df_row
del tableData
if pqwriter:
print("Cerrando archivo parquet")
pqwriter.close()
del data
del chunksize
del iteraccion
dtype
of your columns and the number of columns you could easily reach 4GB. E.g., – Plasterboardchunksize
option inread_sql
to read and process the query chunk by chunk: pandas.pydata.org/pandas-docs/version/0.15.0/io.html#querying – Drews